반응형
이전 글: 2022.03.21 - [개발/reactive] - [reactive] 6. refactoring
CompletableFuture
- 비동기 시작 supplyAsync vs runAsync
- supplyAsync: 파라미터로 supplier 인터페이스 받고 반환 값(CompletableFuture <T>) 존재
- runAsync: 파라미터로 runnable 인터페이스 받고 반환 값없음
- 채이닝, 후속작업 thenApply vs thenAccept vs thenRun vs thenCompose vs thenCombine
- thenApply: 데이터를 포함하는 future 반환(CompletableFuture <U>)
- thenAccept: 파라미터로 Comsumer 받고 반환 값없음 (CompletableFuture <Void>)
- thenRun: 파라미터로 runnable 받고 반환 값없음
- thenCompose: 앞 단계의 CompletableFuture을 주고받고 하면서 순차적으로 연결 가능
- thenCombine: 전혀 다른 CompletableFuture을 첫 인자로 받고, 결과를 연산하는 bifunction을 두 번째 인자로 받음
- 데이터 가져오기(blocking) get vs join
- get: checkedException 던지기 때문에 try/catch 처리 필요
- join: uncheckedException 던짐; 예외처리가 내부적
- 감싸기 completedFuture: 이미 완료된 작업이나 정적인 값을 CompletableFuture로 감쌀 때
- 예외 발생 시 exceptionally vs handle
- exceptionally: 예외 발생 시 해당 예외를 받아서 처리 가능
- handle: s, e를 다 받는 bifunction 구현체
- 별도의 스레드 풀에서 작업 시 ~async 메서드
- 다른 스레드가 후속 작업을 진행
- thenApplyAsync
- 여러 CompletableFuture을 병렬로 실행하고 모든 프로세스가 끝나길 기다렸다가(blocking) 처리하려면 allOf
연습 코드
ExecutorService es = Executors.newFixedThreadPool(10);
//Future 비동기 작업 결과를 담고있는 오브젝트 get -> blocking
//listenable future 콜백; 완료 시점에
//CompletableFuture 비동기 작업의 결과를 간단하게 만들 수
//리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 장점
//병렬성과 동시성에서 CP가 의미있는데, 여러 cpu core 사이에 지연실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능
//completion station? stage? 장점은 코어성능 20% 더 효율적임
//then ~ 이전의 스레드를 사용
//then~async 하면 정책에 따라 새로운 스래드 할당
CompletableFuture
.supplyAsync(() -> {
log.info("run");
//if(1==1) throw new RuntimeException(); //exception으로 감
return 1; //생성
})
.thenCompose(s -> { //compose 하면 completedfutre의 값만 넘김
log.info("then {}", s);
return CompletableFuture.completedFuture(s + 1); //받아서 작업하고 넘기고
})
.thenApply(s3 -> {
log.info("then3 {}", s3);
return s3 * 10; //받아서 작업하고 넘기고
})
.exceptionally(e -> -10) //예외발생하면 복구할 때 사용가능
.thenAcceptAsync(s2 -> log.info("thenn2 {}", s2), es) //받아서 끝 어떤 스래드풀인지 알려줘야
;
log.info("exit");
이전 시간에 작성한 코드를 CompletableFuture 방식으로 바꿔본다. 수정된 부분 위주로 작성한다.
///before
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
log.info("/rest {}", idx);
DeferredResult<String> dr= new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e3 -> {
dr.setErrorResult(e3.getMessage());
}
);
}, e2-> {
dr.setErrorResult(e2.getMessage());
});
}, e-> {
//throw 하면 어디서 에러가 난지 알 수 없어 비동기라
dr.setErrorResult(e.getMessage());
});
return dr;
}
///after
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
log.info("/rest {}", idx);
DeferredResult<String> dr= new DeferredResult<>();
toCF(rt.getForEntity(URL1, String.class, "f1" + idx))
.thenCompose(s -> {
if(1==1) throw new RuntimeException("ERROR");
return toCF(rt.getForEntity(URL2, String.class, s.getBody()));
})
// .thenCompose(s2 -> toCF(myService.work(s2.getBody())))
.thenApplyAsync(s2 -> myService.work(s2.getBody()))
.thenAccept(s3 -> dr.setResult(s3))
.exceptionally(e -> {
dr.setErrorResult(e.getMessage());
return null;
})
;
return dr;
}
//lf -> cf 변환
<T> CompletableFuture<T> toCF(ListenableFuture<T> lf){
CompletableFuture<T> cf = new CompletableFuture<T>(); //작업의 결과를 나타내는거지 비동기 작업자체는 아님
lf.addCallback(s -> {cf.complete(s);}, e -> {cf.completeExceptionally(e);});
return cf;
}
////
@Service
public static class MyService{
//@Async //cf를 쓴다면 동기로 하고 위에서 적용하는것도 방법
public String work(String req){
log.info("myservice {}" , req);
// return new AsyncResult<String>(req + "/asyncwork");
return req + "/asyncwork";
}
}
참고
https://brunch.co.kr/@springboot/267
728x90
반응형
'개발 > reactive' 카테고리의 다른 글
[reactive] 9. Mono (0) | 2022.03.23 |
---|---|
[reactive] 8. webflux (0) | 2022.03.22 |
[reactive] 6. refactoring (0) | 2022.03.21 |
[reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.03.18 |
[reactive] 4-2. spring 비동기를 위한 interfaces/classes (0) | 2022.03.17 |