반응형

이전 글: 2022.03.21 - [개발/reactive] - [reactive] 6. refactoring

 

[reactive] 6. refactoring

이전 글: 2022.03.18 - [개발/reactive] - [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet 오늘은 5강을 이어서 학습한다. 1. default, block..

bangpurin.tistory.com

 

 

CompletableFuture

  • 비동기 시작 supplyAsync vs runAsync
    • supplyAsync: 파라미터로 supplier 인터페이스 받고 반환 값(CompletableFuture <T>) 존재
    • runAsync: 파라미터로 runnable 인터페이스 받고 반환 값없음
  • 채이닝, 후속작업 thenApply vs thenAccept vs thenRun vs thenCompose vs thenCombine
    • thenApply: 데이터를 포함하는 future 반환(CompletableFuture <U>)
    • thenAccept: 파라미터로 Comsumer 받고 반환 값없음 (CompletableFuture <Void>)
    • thenRun: 파라미터로 runnable 받고 반환 값없음
    • thenCompose: 앞 단계의 CompletableFuture을 주고받고 하면서 순차적으로 연결 가능
    • thenCombine: 전혀 다른 CompletableFuture을 첫 인자로 받고, 결과를 연산하는 bifunction을 두 번째 인자로 받음
  • 데이터 가져오기(blocking) get vs join
    • get: checkedException 던지기 때문에 try/catch 처리 필요
    • join:  uncheckedException 던짐; 예외처리가 내부적
  • 감싸기 completedFuture: 이미 완료된 작업이나 정적인 값을 CompletableFuture로 감쌀 때
  • 예외 발생 시 exceptionally vs handle
    • exceptionally: 예외 발생 시 해당 예외를 받아서 처리 가능
    • handle: s, e를 다 받는 bifunction 구현체
  • 별도의 스레드 풀에서 작업 시 ~async 메서드
    • 다른 스레드가 후속 작업을 진행
    • thenApplyAsync
  • 여러 CompletableFuture을 병렬로 실행하고 모든 프로세스가 끝나길 기다렸다가(blocking) 처리하려면 allOf

 

연습 코드

   ExecutorService es = Executors.newFixedThreadPool(10);
    //Future 비동기 작업 결과를 담고있는 오브젝트 get -> blocking
    //listenable future 콜백; 완료 시점에
    //CompletableFuture 비동기 작업의 결과를 간단하게 만들 수
    //리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 장점
    //병렬성과 동시성에서 CP가 의미있는데, 여러 cpu core 사이에 지연실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능

    //completion station? stage? 장점은 코어성능 20% 더 효율적임
    //then ~ 이전의 스레드를 사용
    //then~async 하면 정책에 따라 새로운 스래드 할당
    CompletableFuture
            .supplyAsync(() -> {
                log.info("run");
                //if(1==1) throw new RuntimeException(); //exception으로 감
                return 1; //생성
            })
            .thenCompose(s -> { //compose 하면 completedfutre의 값만 넘김
                log.info("then {}", s);
                return CompletableFuture.completedFuture(s + 1); //받아서 작업하고 넘기고
            })
            .thenApply(s3 -> {
                log.info("then3 {}", s3);
                return s3 * 10; //받아서 작업하고 넘기고
            })
            .exceptionally(e -> -10) //예외발생하면 복구할 때 사용가능
            .thenAcceptAsync(s2 -> log.info("thenn2 {}", s2), es) //받아서 끝 어떤 스래드풀인지 알려줘야
    ;
    log.info("exit");

 

이전 시간에 작성한 코드를 CompletableFuture 방식으로 바꿔본다. 수정된 부분 위주로 작성한다.

///before
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();

    ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
    f1.addCallback(s -> {
        ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
        f2.addCallback(s2 -> {
            ListenableFuture<String> f3 = myService.work(s2.getBody());
            f3.addCallback(s3 -> {
                dr.setResult(s3);
            }, e3 -> {
                dr.setErrorResult(e3.getMessage());
                    }
            );
        }, e2-> {
            dr.setErrorResult(e2.getMessage());
        });
    }, e-> {
        //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
        dr.setErrorResult(e.getMessage());
    });
    return dr;
}
///after
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();

    toCF(rt.getForEntity(URL1, String.class, "f1" + idx))
            .thenCompose(s -> {
                if(1==1) throw new RuntimeException("ERROR");
                return toCF(rt.getForEntity(URL2, String.class, s.getBody()));
            })
          //  .thenCompose(s2 -> toCF(myService.work(s2.getBody())))
            .thenApplyAsync(s2 -> myService.work(s2.getBody()))
            .thenAccept(s3 -> dr.setResult(s3))
            .exceptionally(e -> {
                dr.setErrorResult(e.getMessage());
                return null;
            })
    ;

    return dr;
}

//lf -> cf 변환
<T> CompletableFuture<T> toCF(ListenableFuture<T> lf){
    CompletableFuture<T> cf = new CompletableFuture<T>(); //작업의 결과를 나타내는거지 비동기 작업자체는 아님
    lf.addCallback(s -> {cf.complete(s);}, e -> {cf.completeExceptionally(e);});
    return cf;
}

////
@Service
public static class MyService{
    //@Async //cf를 쓴다면 동기로 하고 위에서 적용하는것도 방법
    public String work(String req){
        log.info("myservice {}" , req);
      //  return new AsyncResult<String>(req + "/asyncwork");
        return req + "/asyncwork";
    }
}

참고

https://brunch.co.kr/@springboot/267

 

CompletableFuture

자바 비동기 프로그래밍을 위한 CompletableFuture 검토 | 필자는 최근에 CompletableFuture 를 사용해서 개발을 진행하였는데, CompletableFuture는 작년에 한번 사용한 이후로는 실무에서 사용할 일이 거의

brunch.co.kr

https://wbluke.tistory.com/50

 

CompletableFuture 톺아보기

CompletableFuture 학습 테스트 안녕하세요! 이번 포스팅에서는 학습 테스트를 통해 CompletableFuture를 알아보는 시간을 가져보려고 합니다. 모든 코드는 GitHub에 있으니 참고하시면 됩니다. CompletableFutur

wbluke.tistory.com

 

728x90
반응형

+ Recent posts