728x90
반응형
728x90
반응형
반응형

reactor를 공부하다 보면 Mono/Flux 이후에 많이 보게 되는 게 Sinks이다.. 솔직히 아직 mono와 flux도 완벽하게 이해하지 못했는데, 자꾸 새로운 개념이 튀어나오는 게 두렵긴 하지만 ㅋㅋ 어쨌건 계속 봐야 모래알 같은 지식이 쌓여 해변가가 될 것이라 믿기에, 짧은 지식으로 나마 원문을 파보도록 한다.

java doc에 기재되어 있는 Sinks의 개념이다. mono/flux보다 한 단계 더 내부에서 실제 동작하는 시그널의 구조체 같은 느낌인 듯 한데, 저것만 봐서 완전히 와닿지 않는다.

https://projectreactor.io/docs/core/release/reference/#processors

 

Reactor 3 Reference Guide

10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 1

projectreactor.io

기존에 processor라는 개념이 있는데, publisher이면서 subscriber의 역할을 하는 친구이다. 보통 reactive stream의 중간체(데이터를 구독받았다가 다시 방출하는..)를 가리키는데 reactive stream에서는 publisher로 보통 구현한다(이때 subscriber 인터페이스의 함수를 직접 호출하지 않도록 조심해야 한다).

 In Reactor a sink is a class that allows safe manual triggering of signals.

Sinks는 안전하게 event trigger를 할 수 있게 하는 interface며 processor의 대용으로 사용할 수 있다.

 

 

1. 일반 구독(flux.subscribe)

@Test
@DisplayName("해당 구독행위는 array에 대해서 하였기에 뒤에서 변화가 이루어진 부분에 대해서는 이미 끝난 행위라서 아무런 변화가 없음")
public void flux(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));
    Flux.fromIterable(array).collectList().subscribe( (data)->System.out.println(data)); //구독
    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //데이터 변화
}
[a, b, c, d, e]

구독 이후에 발생한 데이터의 변화는 감지하지 않는다. 데이터가 정해져있고 거기에 구독자를 추가한다.

 

2. subscribe를 계속 감지하게 하려면? Processor 사용

Flux의 Processor에는 FluxProcessor, EmitterProcessor, ReplayProcessor 등 많은 프로세서들이 존재하는데, 그중 EmitterProcessor는 여러 구독자(subscriber)가 사용할 수 있는 구독과 발행이 동시해 일어나는 프로세서이다.

EmitterProcessor: An implementation of a message-passing Processor implementing publish-subscribe with synchronous (thread-stealing and happen-before interactions) drain loops.

또한, 구독 행위가 등록되고 난 이후에 해당 이벤트가 발생하면 구독하는 대상에게 데이터를 동기적으로 전달한다.

 

EmitterProcessor를 사용한 코드

@Test
public void beforeSink(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e"}));

    //프로세서 시작 구간.
    EmitterProcessor<List<String>> data = EmitterProcessor.create();  //발행인
    data.subscribe(t -> System.out.println("1번 : "+t));  //구독자 추가
    FluxSink<List<String>> sink = data.sink();   //배달부
    sink.next(array); //배달 완료

    array.addAll(Arrays.asList(new String[]{"new", "data", "hello"}));  //내용 추가

    data.subscribe(t -> System.out.println("2번 : "+t));  //구독자 추가
    sink.next(array);  //배달

    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //내용 추가
    sink.next(array);  //배달
}
1번 : [a, b, c, d, e]
1번 : [a, b, c, d, e, new, data, hello]
2번 : [a, b, c, d, e, new, data, hello]
1번 : [a, b, c, d, e, new, data, hello, 1, 2, 3]
2번 : [a, b, c, d, e, new, data, hello, 1, 2, 3]

Processor는 기존에 있거나 새롭게 등장한 구독자(subscriber)에게 데이터(old data)를 전달한다. 구독자 중심이랄까. 

발행하는 기관(processor)을 건설하고 구독자를 모집(subscribe)한 뒤에 계속해서 발행(sink.next)하는 형태이다. 구독 이후에 데이터가 변경되어 발행되었다면 추가 구독없이 변경된 데이터를 받아볼 수 있다. 

작업 할 내용이 데이터가 중심이면 1번의 일반 구독형태를 사용하고, 구독자가 중심이면 프로세스를 사용하면 될 것 같다.

 

3. EmitterProcessor 부분이 reactor 3.5에서 deprecated 돼서 Sinks로 바꿔본다.

@Test
@DisplayName("flux: hot 테스트")
public void sinkTest_multicast(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e"})); //내용

    Sinks.Many<List<String>> sink = Sinks.many().multicast().directBestEffort(); //발행인
    sink.asFlux().subscribe(data -> System.out.println("1번 : " + data)); //구독자 추가
    sink.tryEmitNext(array); //발행함
    sink.asFlux().subscribe(data -> System.out.println("2번 : " + data)); //구독자 추가
    array.addAll(Arrays.asList(new String[]{"1", "2", "3", "4", "5"})); //내용 추가
    sink.tryEmitNext(array); //발행함
    sink.asFlux().subscribe(data -> System.out.println("3번 : " + data)); //구독자 추가
}
1번 : [a, b, c, d, e]
1번 : [a, b, c, d, e, 1, 2, 3, 4, 5]
2번 : [a, b, c, d, e, 1, 2, 3, 4, 5]

1번은 한번 구독으로 변경된 데이터도 다시 받았고, 2번은 구독 이후 데이터를 받았다. 3번은 구독(subscribe) 이후에 아무 데이터 변경이 없어서 로그에 남지 않는다.

 


 

아래 예제를 따라하면 Sink의 multicast/unicast의 차이, sinks.one/sinks.many를 공부할 수 있다. hot/cold publisher에 대한 개념은 덤!

https://prateek-ashtikar512.medium.com/projectreactor-sinks-bac6c88e5e69

 

ProjectReactor — Sinks

public static <T> Sinks.One<T> one() A Sinks.One that works like a conceptual promise: it can be completed with or without a value at any…

prateek-ashtikar512.medium.com

 

@Test
@DisplayName("n subscribers :: 1 message")
public void sinkOne(){
    Sinks.One<Object> sink = Sinks.one(); //n subscribers :: 1 message
    Mono<Object> mono = sink.asMono();

//        mono.subscribe(d -> System.out.println("Sam: " + d));
    mono.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE); //많이 받겠다고 해고 sinks.one 자체가 1개만 방출해서 하나만 받음
            System.out.println(">>>> onSubscribed " + s); // 이거 호출되고..
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });
    mono.subscribe(d -> System.out.println("Sam: " + d));

    sink.tryEmitValue("Hollo");
    sink.tryEmitValue("hi~"); //안받음! one이라서.. 하나만받음
}

@Test
@DisplayName("1 subscriber :: n message")
public void unicast(){
    Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

//        flux.subscribe(d-> System.out.println("Date : "+ d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
            System.out.println(">>>> onSubscribed " + s); // 이거 호출되고..
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });
    flux.subscribe(d-> System.out.println("Date : "+ d));

    sink.tryEmitNext("hi");
    sink.tryEmitNext("i am hungry");
    sink.tryEmitNext("bye");
}

@Test
@DisplayName("sink.many는 갑자기 많은걸 방출하면 FAIL_NON_SERIALIZED 에러가 나서 이때는 재시도를 해줘야 데이터 누수가 없음")
public void unicast2() throws InterruptedException {
    Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

    List<Object> list = new ArrayList<>();
    flux.subscribe(e -> list.add(e));

    for(int i = 0; i< 1000; i++){
        final int j = i;
        CompletableFuture.runAsync(() -> {
            sink.emitNext(j, new Sinks.EmitFailureHandler() {
                @Override
                public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
                    System.out.println(emitResult.toString());
                    //return true; // true if the operation should be retried, false otherwise.
                    // Sinks.many() factory methods that fail with EmitResult.FAIL_NON_SERIALIZED when multiple producers emit concurrently
                    //그래서 FAIL_NON_SERIALIZED 에러일 때는 재시도하도록 해줘야 함
                    return emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
                }
            });
        });
    }

    Thread.sleep(5_000);
    System.out.println(list.size());
}

@Test
@DisplayName("n subscribers :: n message / 구독 이후 발행한 메세지부터 받음 / hot")
public void multicast(){
    Sinks.Many<Object> sink = Sinks.many().multicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

    sink.tryEmitNext("hi");
    sink.tryEmitNext("hello");

    flux.subscribe(d -> System.out.println("SAM: " + d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(5); //총 가져올 갯수 1이면 sink.one과 같은 효과
            System.out.println(">>>> onSubscribed "); // 이거 호출되고 이제부터 데이터 받음
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o); //실 데이터
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });

    sink.tryEmitNext("????");
    flux.subscribe(d-> System.out.println("here: " +d));
    sink.tryEmitNext(" new");
}

@Test
@DisplayName("n subscribers :: n message / 구독 이전 발행 한 메세지도 받음 / cold")
public void manyReplay(){
    Sinks.Many<Object> sink = Sinks.many().replay().all(); //onSubscribe 하고 바로 첨부터하고 그담부터는 리스닝
    Flux<Object> flux = sink.asFlux();

    sink.tryEmitNext("hi");
    sink.tryEmitNext("how are you");

    flux.subscribe(d -> System.out.println("First: " + d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE); //총 가져올 갯수 1이면 sink.one과 같은 효과
            System.out.println(">>>> onSubscribed "); // 이거 호출되고 이제부터 데이터 받음
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o); //실 데이터
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });

    sink.tryEmitNext("???");
    flux.subscribe(d -> System.out.println("Demon: " +d));
    sink.tryEmitNext("new");
}
728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[webclient] 비슷한데 뭘 써야할지 모르겠는 것들  (0) 2022.04.01
[webflux] block vs toFuture  (0) 2022.03.31
[webflux] 실무투입고민  (0) 2022.03.30
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 10. Flux  (0) 2022.03.25
반응형
  • retrieve vs exchange(exchangeToMono)

retrieve: http status code가 200일 경우 response body처리
http status code가 다른 경우(400, 500, etc.) WebClientResponseException 떨어짐
에러 처리 커스텀하게 하고 싶으면 onStatus 사용

exchange: any response에서도 사용 가능하나 꼭 response 내용을 사용해야 함(성공이건 실패건 상관없이) 아니면 memory leak이 있을 수 있다고..

응답이 200이고 응답 body에 대한 처리만 하고 싶은 경우 retrieve.
이 외에 응답 코드가 4xx, 5xx 등 특정 코드에 따라 다른 결과를 주고 싶거나 응답 헤더를 사용하고 싶은 경우는 exchange를 사용


  • 에러 처리할 때 doOnNext vs flatMap?

둘 다 작동하긴 하지만 함수의 사상 상 doOnNext가 더 적합한 것 같다. 둘 다 두면 위에서 걸려서 아래로 안 흐른다.

public Mono<BaseResponse<String>> getPopo(){
    System.out.println(">> popo " + Thread.currentThread().getName());
    return webClient.get()
            .uri(uriBuilder -> uriBuilder.path("/api/hello/popo").build())
            .retrieve()
            .onStatus(HttpStatus::isError, resp -> Mono.error(new Exception()))
            .subscribeOn(Schedulers.boundedElastic())
           .bodyToMono(new ParameterizedTypeReference<BaseResponse<String>>(){})
//                .doOnNext(res -> {
//                    if(res.getHeader().getStatus() == 200){
//                        throw new Exception("error");
//                    }
//                })
         
            .flatMap(res -> {
                if(res.getHeader().getStatus() == 300){
                    return Mono.error(new Exception("검증실패"));
                    //return Mono.empty();
                }else{
                    System.out.println(">> popo2 " + Thread.currentThread().getName());
                    return Mono.just(res);
                }
            })
            .delayElement(Duration.ofSeconds(10))
            ;

}

 


참고: https://binux.tistory.com/56

 

[Spring Reactive] WebClient

Web Reactive Stack 공식문서의 WebClient 부분을 읽고 해석하며 작성했습니다. Web on Reactive Stack The original web framework included in the Spring Framework, Spring Web MVC, was purpose-built for th..

binux.tistory.com

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[reactor] Sinks  (0) 2022.06.13
[webflux] block vs toFuture  (0) 2022.03.31
[webflux] 실무투입고민  (0) 2022.03.30
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 10. Flux  (0) 2022.03.25
반응형

이전 글: 2022.03.30 - [개발/reactive] - [webflux] 실무투입고민

 

[webflux] 실무투입고민

이전 글: 2022.03.25 - [개발/reactive] - [spring] spring-web and spring-webflux [spring] spring-web and spring-webflux spring mvc 기반 spring-web의 restTemplate으로 api를 요청하고 있던 와중, restTemp..

bangpurin.tistory.com

 

현재 실무에서 spring-web + spring-webflux(webClient)인 1번 프로젝트spring-webflux only인 2번 프로젝트를 동시에 작업하고 있다.

비동기를 사용하는 부분은 주로 외부 api call 인 부분이며, 최종 res는 동기로 내려주게 사용 중이다. 

비슷한 로직이어도 두 프로젝트에서 다르게 작동하는 부분이 있어 정리해본다.


1. 비동기로 함수를 실행시키기 위해 block()을 사용

block()/blockFirst()/blockLast() are blocking, which is not supported in thread

1번 프로젝트에서는 spring-mvc 기반이기 때문에 Mono.zip.block()이 가능했지만, 2번 프로젝트에서는 위와 같은 에러가 뜨면서 작동하지 않는다.

2번 프로젝트는 thread blocking이 지원되지 않기 때문에 에러가 난다. 해결방안을 찾아봐도 spring-web을 추가하거나 다른 걸 쓰라고 한다.

대안으로는 Mono.zip.toFuture.get()으로 미래에 완료가 되면 꺼내오는 방식을 사용하면 된다.

 

반대로 1번 프로젝트의 block()을. toFuture(). get()으로 수정한다면?

-> 정상 작동된다.

 

그렇다면 toFuture.get 은 blocking 일까? 왜냐면 두 함수의 동작이 똑같아 보이기 때문이다.

둘 다 subscribes immediately 하기 때문에 그래 보인다!

block()은 스레드를 진짜 블로킹하지만 toFuture은 completableFuture을 리턴하여 작업이 끝날 때까지 기다렸다가 get으로 바디를 꺼내 주는데 즉시 실행하기에 블로킹으로 보인다는.. 것..이라는 것

즉 toFuture가 스레드 관리에 더 flexible 하다.

https://stackoverflow.com/questions/58504527/is-mono-tofuture-blocking

 

Is Mono.toFuture() blocking?

From the Official Documentation of Mono#block() it is said that: Subscribe to this Mono and block indefinitely until a next signal is received. Returns that value, or null if the Mono completes ...

stackoverflow.com


Nothing happens until you subscribe… until something does

2. webflux기반인 2번 프로젝트에서는 controller단의 Mono return만으로도 subscribe가 작동한다.

spring-web기반, 혹은 webflux의 서비스나 내부의 함수에서 작동하는 게 아니었다

https://stackoverflow.com/questions/56487429/who-calls-subscribe-on-flux-or-mono-in-reactive-webapplication

 

who calls subscribe on Flux or Mono in reactive webapplication

I am looking at some examples of reactive web applications and i am seeing them like this @RequestMapping(value = "/{id}", method = RequestMethod.GET) @ResponseBody public Mono<Person>

stackoverflow.com

https://github.com/reactor/reactor-netty/blob/db27625064fc78f8374c1ef0af3160ec3ae979f4/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java#L962

 

GitHub - reactor/reactor-netty: TCP/HTTP/UDP/QUIC client/server with Reactor over Netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty - GitHub - reactor/reactor-netty: TCP/HTTP/UDP/QUIC client/server with Reactor over Netty

github.com


Nothing happens until you subscribe… until something does

 

3. webflux 기반인 프로젝트에서 애매하게. toFuture(). get()을 결과에 써놓고 아무 데서도 subscribe를 하지 않는다면.. (메인 스레드는 subscribe가 올 때까지 무한) pending이 걸린다..

//컨트롤러
public BaseResponse<UserDetailRes> getUserDetailInfo(@RequestParam String memberValue) {
    return getUserDetailInfoSS(memberValue).toFuture().get();
}

//서비스
public Mono<BaseResponse<UserDetailRes>> getUserDetailInfoSS(String memberValue){
	Mono<String> a = callA(); //webclient 호출
	Mono<Object> b = ...; //webclient 호출
	Mono<Object> c = ...; //webclient 호출

	return Mono.zip(a, b, c).flatMap();
}

여기서 엄청난 고민을 했었는데 두 가지 방법으로 해결할 수 있다.

첫 번째. Mono 함수에서 subscribeOn으로 다른 스레드에게 할당을 해서 해소한다. 근데 이게 온전한 해결방법인지는 모르겠다. 뭔가 얻어걸린듯한..

처음에는 subscribeOn의 추가만으로 정상 작동한다는 게 이해가 안 갔는데,, 다른 스레드가 구독을 한다는 의미로 받아들여지는 것 같다.

여기서 또 특이한 점은 모든 Mono 함수에 subscribeOn을 넣을 필요도 없고 맨 처음 함수에만 넣어도 된다. 심지어 zip에 넣어도 작동한다.

즉 위의 예시에서는 callA()의 함수만 넣어도 b, c도 동일한 스레드로 작동하기 때문에 두 번 넣을 필요가 없다.

단 a는 넣지 않고 b나 c에 넣으면 다시 작동이 안 된다..

형태가 맘에 안 든다면 zip에 넣어도 된다. 이는 해당 job 전체를 별도 스레드 할당하는 것이고 위는 그 api를 별도 스레드로 할당한다는 차이가 있다.

Schedulers.boundedElastic()옵션은 다른 논블로킹 작업들에게 영향 없이 하나의 싱글 스레드에서 블로킹 작업한다는 의미이다. 다른 옵션들도 많이 있다.

//다른 서비스
public Mono<String> callA() {
	return webclient.get()
            .uri()
            .retrieve()
            .bodyToMono(생략)
            .subscribeOn(Schedulers.boundedElastic()) //스레드 할당
	;
}

//서비스
//subscribeOn을 어디서 호출하는게 좋을지 고민이 필요하다
public Mono<BaseResponse<UserDetailRes>> getUserDetailInfoSS(String memberValue){
	Mono<String> a = callA(); //webclient 호출
	Mono<Object> b = callB().subscribeOn(Schedulers.boundedElastic()) //webclient 호출
	Mono<Object> c = ...; //webclient 호출

	return Mono.zip(a, b, c)
        .flatMap()
        .subscribeOn(Schedulers.boundedElastic())
    ;
}

 

두 번째. 아까 위에서 언급한, 'webflux의 경우 controller return에서 subscribe를 해준다'를 응용한다!

subscribeOn은 지우고(물론 별도 스레드를 할당해서 할 거면 둬야지) mono로 리턴하게 바꾼다.

//컨트롤러
public Mono<BaseResponse<UserDetailRes>> getUserDetailInfo(@RequestParam String memberValue) {
    return getUserDetailInfoSS(memberValue);
}

//서비스
public Mono<BaseResponse<UserDetailRes>> getUserDetailInfoSS(String memberValue){
	Mono<String> a = callA(); //webclient 호출
	Mono<Object> b = ...; //webclient 호출
	Mono<Object> c = ...; //webclient 호출

	return Mono.zip(a, b, c).flatMap();
}

이렇게 명료한 것을.. spring-web의 block의 늪에 빠져 큰 그림을 못 본 점 + 서비스에서 모노 리턴도 subscribe가 되는 거 아닌가 생각했던 안일한 생각으로 인해 돌고 돌아서 도착한 것 같다.

 


참고

https://projectreactor.io/docs/core/release/reference/#getting-started

 

Reactor 3 Reference Guide

10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 1

projectreactor.io

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[reactor] Sinks  (0) 2022.06.13
[webclient] 비슷한데 뭘 써야할지 모르겠는 것들  (0) 2022.04.01
[webflux] 실무투입고민  (0) 2022.03.30
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 10. Flux  (0) 2022.03.25
반응형

이전 글: 2022.03.25 - [개발/reactive] - [spring] spring-web and spring-webflux

 

[spring] spring-web and spring-webflux

spring mvc 기반 spring-web의 restTemplate으로 api를 요청하고 있던 와중, restTemplate가 deprecated 된다는 말을 듣고, 대체제인 webClient를 사용해보려고 공부를 하다 보니 webflux의 영역에 들어와 버렸다...

bangpurin.tistory.com

springboot2

springboot2를 사용하게 되면 서블렛 기반으로 갈지 리엑티브 기반으로 갈지 고민하게 된다. 예전 강의나 자료에서는 둘 중 하나만 골라서 얹어야 한다고 그러는데, 요즘에는 둘 다 얹어서 사용 가능하다(정확히 말하자면 둘 다 있으면 서블랫 기반으로 돌고 webflux의 몇몇 라이브러리를 사용할 수 있다)

새로운 프로젝트를 하기로 했고, 역시나 새로운 기술의 도입의 유혹에서 벗어나지 못하고 있다. spring-web기반으로만 작업해보아서 webflux를 사용해보려고 하는데,, 하면서 이게 맞는지 확신이 안서는데, 빠르게 결정을 해야 하는 상황을 맞닥뜨렸다. 다음은 고민의 일지이다.

 

1. web/webflux를 동시 사용해보자. 딱히 소스가 달라지는건 없지만 restTemplate가 없어진다는데 그거라도 보완하는 거야..

즉 spring-mvc(톰캣)를 사용하면서 api 요청 부분은 webClient로 비동기 처리, 디비는 동기

-> 어드민 백엔드와 같이 단순 CRUD 일 경우 활용하기로 하였다.

implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-webflux"

implementation "org.springframework.boot:spring-boot-starter-data-jpa"

implementation "org.springdoc:springdoc-openapi-ui:${swaggerVersion}"
implementation "org.springdoc:springdoc-openapi-webflux-core:${swaggerVersion}"

 

2. 너무 달라지는 게 없는 것 같으니까 spring-webflux로만 개발해보자, 근데 아직 R2DBC는 개발 장벽이 너무 커, 그리고 아직 DB까지 비동기로 진행할 일도 별로 없어..

즉 spring-webflux(netty)를 메인으로 사용하되 디비는 동기방식 사용

-> 테스트해보니 몇몇 필터만 수정해주면 spring-mvc방식으로 개발했던 코드도 동일하게 작동하였다. 게다가 reactive 공부하면서 바로바로 적용 가능하니 api서버 개발 시 시도해볼 만한 스택이다.

implementation "org.springframework.boot:spring-boot-starter-webflux"
implementation "org.springframework.boot:spring-boot-starter-data-jpa"
implementation "org.springdoc:springdoc-openapi-webflux-ui:${swaggerVersion}"

spring-webflux가 controller -> service -> repository 구조도 지원하지만(annotation 기반)

router -> handler/service -> repository 구조가 진짜 모습이다.(함수형 프로그래밍 모델 기반)

하지만 진짜 webflux의 기능을 쓰지 못하기 때문에 성능상 이점이 없을 듯하여 굳이 이렇게 사용하지 않아도 될 것 같다.

게다가 webflux 디펜덴시만 있으면 스웨거나 filter 설정 등 기존 로직에 영향이 있을 수 있다.

고생에 비해 성과가 미미할 수 있으니 고민이 필요하다.

 

3. webflux with r2dbc.. 디비까지 모든 걸 다 리액티브로 바꾸자니 소스에 하나라도 블로킹 걸리면 의미가 없으니 조심해야 하고, 개발 장벽도 있고 무엇보다 혼자 하는 게 아니라 팀으로 개발하니 팀원들의 동의도 구해야 하고.. 개발 속도도 안 날 것 같으니 우선 보류!

디비마저도 비동기로 처리하는 설정, 궁극적으로 지향해야 하는 부분이지만 일반적인 api에는 이렇게까지 사용할 필요는 없을 것 같다. 진짜 콜이 많거나 백그라운드에서 동작하는 것들이 많을 때(배치성 업무) 효력이 좋을 것 같다.

implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
//implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
//implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
//implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
//implementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug")
implementation("org.springdoc:springdoc-openapi-webflux-ui:$openApiVersion")
implementation("dev.miku:r2dbc-mysql")

router, handler, r2dbc 관련 처리에 익숙하면 해볼 만한데 아직 미숙하여 100프로 전환은 아직 무리가 아닐까 싶다.

 

위 설정에서 조심해야 하는 게 webflux의 유무에 따른 swagger dependency도 달라져야 한다는 건데, 까딱하다가는 스웨거가 안 나올 수 있으니 꼭 같이 확인해야 한다.

 

우선 시험 삼아 어드민 백엔드에는 spring-web, spring-webflux /springfox3.0 swagger의 형태로 유지,

api 서버에는 spring-web을 드러내고 spring-webflux / springdoc3 swagger만 둔 형태로 유지해 볼 생각이다. 약간 모험이긴 한데 공부를 하고 알아갈 때마다 바로바로 개선할 수 있다는 이점이 있달까..


참고

https://www.baeldung.com/spring-mvc-async-vs-webflux

https://marrrang.tistory.com/4

 

Spring Webflux에 대하여

Spring webflux에 대해 찾아보고 정리한 글입니다. 일부만 정리되어 있어요, 참고 부탁드려요 ㅎㅎ Spring webflux Spring MVC vs Spring webflux? spring MVC는 Servlet spec에 기반하여 만들어져 있고 Servlet A..

marrrang.tistory.com

https://dreamchaser3.tistory.com/13

 

Spring Webflux - Functional Endpoints

개요 Spring WebFlux는 WebFlux.fn이라는 요청을 라우팅하고 처리하는 경량의 함수형 프로그래밍 모델을 포함하고 있다. 물론 Webflux는 기존 MVC처럼 어노테이션 기반의 프로그래밍 모델도 지원하기 때

dreamchaser3.tistory.com

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[webclient] 비슷한데 뭘 써야할지 모르겠는 것들  (0) 2022.04.01
[webflux] block vs toFuture  (0) 2022.03.31
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 10. Flux  (0) 2022.03.25
[reactive] 9. Mono  (0) 2022.03.23
반응형

spring mvc 기반 spring-web의 restTemplate으로 api를 요청하고 있던 와중, restTemplate가 deprecated 된다는 말을 듣고, 대체제인 webClient를 사용해보려고 공부를 하다 보니 webflux의 영역에 들어와 버렸다. 물론 webClient도 sync call을 지원하지만 수많은 api 콜을 비동기로 하면 자연스레 내 api의 속도도 빨라질 것이 아닌가? 위기를 기회로 전환하며 새로운 아키텍처를 익히려고 spring-webflux를 추가하였다.

그런데 spring reactive강의를 듣던 도중, 두 dependency는 spring context의 혼란을 야기하므로 같이 사용하면 안 된다는 말을 들었다. 오래된 강의긴 했지만 나름 스프링 저명인사가 말한 것이기에 안되리라 생각하고 좌절하며 관련 내용을 더 찾아보기로 했다.

 

1. spring mvc vs spring webflux

https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#web

spring mvc가 있으면 mvc가 우선순위가 높고 webflux만 있으면 webflux를 쓴다고 한다. 즉, 둘 다 있으면 mvc가 우선 순위다.

근데 애초에 같이 있어도 된다는 전제를 한다면 같이 써도 된다는 것 아닌가?!

 

2. spring mvc with webclient

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

스프링 공식문서에 spring mvc + webclient형식으로 같이 써도 된다는 글이 있다.. 하하하하하 살았다.

 

그리고 스프링에서 같이 쓰는 동영상을 올린 적이 있다(물론 주내용은 springboot2에 대한 데모지만)

26분부터~ restTemplate -> webClient 바꾸는 내용 하면서 언급함.

 

spring mvc에 webClient를 써야 하는가에 대한 많은 토론

https://stackoverflow.com/questions/51953826/resttemplate-vs-webclient-benefits-in-servlet-based-web-mvc-app

 

RestTemplate vs WebClient benefits in Servlet based web-mvc app

I'm looking for a clarification on the bolded text in the statement below (I've provided the full paragraph for context only): The RestTemplate is not a good fit for use in non-blocking applicat...

stackoverflow.com

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[webflux] block vs toFuture  (0) 2022.03.31
[webflux] 실무투입고민  (0) 2022.03.30
[reactive] 10. Flux  (0) 2022.03.25
[reactive] 9. Mono  (0) 2022.03.23
[reactive] 8. webflux  (0) 2022.03.22
반응형

이전 글: 2022.03.23 - [개발/reactive] - [reactive] 9. Mono

 

[reactive] 9. Mono

이전 글: 2022.03.22 - [개발/reactive] - [reactive] 8. webflux [reactive] 8. webflux 이전 글: 2022.03.21 - [분류 전체보기] - [reactive] 7. CompletableFuture [reactive] 7. CompletableFuture 이전 글:..

bangpurin.tistory.com

 

오늘은 flux의 간단한 사용법을 배운다.

 A Flux object represents a reactive sequence of 0.. N items,
while a Mono object represents a single-value-or-empty (0.. 1) result.

Mono와 Flux의 차이를 검색해보면, mono는 하나의 값을 return할return 할 때 쓰이는 반면 flux는 n개의 값을 return 할 때 쓰인다고 나온다. 그런데 사실 두 차이를, 어쩌면 그 의미를 잘 모르겠다. 아래의 예시에서는 같은 결과를 mono/flux로 각각 작성 가능하다는 것을 보여준다. 차이는 flux를 쓰면 flux라이브러리의 다양한 오퍼레이션 함수를 사용할 수 있다고 하는데, 데이터를 수정 없이 그대로 내린다면 mono나 flux나 그게 그거 아닌가..?

@GetMapping("/event/{id}")
Mono<List<Event>> hello(@PathVariable long id){
    //return Mono.just(new Event(id, "event "+ id));
    List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
    //데이터를 컬랙션으로 묶어서 다루고, 각 데이터 다루거나 편집할 때 힘듦
    return Mono.just(list);
}

@GetMapping(value = "/events")
Flux<Event> events(){
    List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
    //data stream -> .map 등 사용 가능
    return Flux.fromIterable(list);
    //같은 결과
    //return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}

@Data
@AllArgsConstructor
public static class Event{
    long id;
    String value;
}

위와 같이 작성하고 두 api를 요청했을 때, 같은 응답 결과가 내려온다.

toby1 % curl localhost:8080/events
[{"id":1,"value":"event1"},{"id":2,"value":"event2"}]                                                                                                                                             nhn@AL01590036 toby1 % curl localhost:8080/events

 

events의 데이터 타입을 바꾸면

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
    List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
    //data stream -> .map 등 사용 가능
    return Flux.fromIterable(list);
    //return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}
toby1 % curl localhost:8080/events
data:{"id":1,"value":"event1"}

data:{"id":2,"value":"event2"}

데이터가 나눠서 들어오는 것을 볼 수 있다.

자바 8의 stream의 기능을 써서 조금 더 수정해보면 아래와 같다.

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
    //데이터생성 using stream
    //Stream<Event> s = Stream.generate(() -> new Event(System.currentTimeMillis(), "val"));
    //data stream -> .map 등 사용 가능; 위와 동일한 결과
    //return Flux.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "val")).limit(10));
    return Flux //server sent event
            .fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "val")))
            .delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
            .take(10);//10개의 request를 보내고 다 오면 cancel 처리
}

stream을 드러내고 flux 기능만을 사용하면 아래와 같겠다.

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
   return Flux //server sent event
            //.range(1, 10) 앞에서 미리 정하고 들어갈 수도 있음;
            //.<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value")))//데이터를 계속 흘러서 보내는 역할; type hint 줘야함
            //상태값을 바꿔서 리턴; 초기상태, 상태 바꿔주는 함수 그담 상태 리턴
            .<Event, Long>generate(()->1L, (id, sink) -> {
                sink.next(new Event(id, "value" + id));//값을 받아 이벤트를 생성해서 싱크로 보내고
                return id+1;//다음 상태 리턴; id로 들어가겟지
            })
            .delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
            .take(10);//10개의 request를 보내고 다 오면 cancel; 뒤에서 끊는 개념
}

flux의 zip을 활용하면 아래 두 가지 방법이 가능하다.

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
   Flux<Event> es = Flux //server sent event
            //상태값을 바꿔서 리턴; 초기상태, 상태 바꿔주는 함수 그담 상태 리턴
            .<Event, Long>generate(()->1L, (id, sink) -> {
                sink.next(new Event(id, "value" + id));//값을 받아 이벤트를 생성해서 싱크로 보내고
                return id+1;//다음 상태 리턴; id로 들어가겟지
            })
            //.delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
            //.take(10);//10개의 request를 보내고 다 오면 cancel; 뒤에서 끊는 개념
    ;
    Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
    //결합해서 delay의 효과를 볼 수 있음
    return Flux.zip(es, interval).map(tu -> tu.getT1());
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
    Flux<String> es = Flux.generate(sink -> sink.next("value"));
    Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); //0부터 시작
    //event 조합
    return Flux.zip(es, interval).map(tu -> new Event(tu.getT2(), tu.getT1())).take(10);
}

비슷한 내용 테스트

https://www.devkuma.com/docs/spring-webflux/

 

Spring WebFlux의 간단한 사용법

여기서는 Spring WebFlux의 간단한 사용법에 대해 소개하겠다. Spring WebFlux는 Spring 5에서 추가된 논블로킹(Non-Blocking) 런타임에서 리액티브 프로그래밍을 할 수 있는 새로운 Web 프레임워크이다. 위의

www.devkuma.com

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[webflux] 실무투입고민  (0) 2022.03.30
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 9. Mono  (0) 2022.03.23
[reactive] 8. webflux  (0) 2022.03.22
[reactive] 7. CompletableFuture  (0) 2022.03.21
반응형

이전 글: 2022.03.22 - [개발/reactive] - [reactive] 8. webflux

 

[reactive] 8. webflux

이전 글: 2022.03.21 - [분류 전체보기] - [reactive] 7. CompletableFuture [reactive] 7. CompletableFuture 이전 글: 2022.03.21 - [개발/reactive] - [reactive] 6. refactoring [reactive] 6. refactoring..

bangpurin.tistory.com

 

 

mono

//publisher -> publisher ... -> subscriber 시 실행

m = mono
    .just(genHello())  //미리 준비하는.. 그 안의 함수도 미리 실행해서 준비해둠 genHello -> mono
    .fromSupplier(()-> genHello()) //mono -> genHello 람다식을 던져야 함수가 그 때 실행해

    //(==) 람다식의 의미는 아래와 같음
    .fromSupplier(new Supplier<String>(){
        public String get(){
            return genHello()
        }
    })

m.subscribe();
log.info("pos2")
return m; //spring 이 subscribe

 

위 처럼 subscribe가 중간에 있다면 어떻게 실행될까
: 두 번 실행됨
subscribe -> pos2 -> subscribe

같은 내용이라 두 번 실행을 막고 싶으면,
m.subscribe 내용을 따로 담고 그 내용을 Mono.just로 감싸서 return 하면 됨

모노 컨테이너 안의 데이터를 빼야겠어
 = mono.block
block 내부에서 subscribe가 실행되면서 내용을 꺼냄

 

hot publisher vs cold publisher

  • cold는 replay 데이터를 처음부터 다시 생성해줌
  • hot은 구독하는 시점부터 데이터를 줌; 처음부터 주는거 아님


subscribe vs block?

  • subscribe은 스케줄러(쓰레드 모드) 설정에 따라서 블록을 할 수도, 안 할 수도 있음. 결과는 람다식으로 받기 때문에 반드시 블록하지 않아도 됨. 
  • 반면에 block은 결과를 람다식에서 콜백으로 받는 게 아니라, 값을 꺼내서 변수 등으로 넘겨야하기 때문에 해당 쓰레드는 블록됨.

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 10. Flux  (0) 2022.03.25
[reactive] 8. webflux  (0) 2022.03.22
[reactive] 7. CompletableFuture  (0) 2022.03.21
[reactive] 6. refactoring  (0) 2022.03.21
반응형

이전 글: 2022.03.21 - [분류 전체보기] - [reactive] 7. CompletableFuture

 

[reactive] 7. CompletableFuture

이전 글: 2022.03.21 - [개발/reactive] - [reactive] 6. refactoring [reactive] 6. refactoring 이전 글: 2022.03.18 - [개발/reactive] - [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet [reactive..

bangpurin.tistory.com

 

 

저번 시간에 작성한 application을 webflux로 바꿔보는 작업을 진행한다.

springboot2.6.4로 진행하였기 때문에 유튜브와 다르게 아래의 의존성을 추가했다. 

implementation 'org.springframework.boot:spring-boot-starter:2.6.4'
//   implementation 'org.springframework.boot:spring-boot-starter-web:2.6.4'
implementation "org.springframework.boot:spring-boot-starter-webflux:2.6.4"

spring-webflux는 기본 서버가 netty라서 모두 네티로 작업하였다.

그리고 설정명이 달라서 수정하였다. 아래 소스를 참고!

@Slf4j
@EnableAsync
@SpringBootApplication
public class Application {

    @RestController
    public static class MyController{

        @Autowired
        MyService myService;

        WebClient client = WebClient.create();

        static final String URL1 = "http://localhost:8081/service?req={req}";
        static final String URL2 = "http://localhost:8081/service2?req={req}";

        @GetMapping("/rest")
        public Mono<String> rest(int idx) {
            //이것만으로는 api 쏘지 않음
            Mono<ClientResponse> r = client.get()
                    .uri(URL1, idx)
                    .exchange();
            //container안의 원소를 받아서 변환해서 다시 container(모노) 담아서 리턴 Mono<Mono<String>>
            //이중으로 감싸주는 것은 원하는 결과가 아니니 flatmap으로 작업해서 하나로 합쳐야 함
            Mono<String> body = r
                    .flatMap(clientResponse -> clientResponse.bodyToMono(String.class))  //Mono<String>
                    .doOnNext(c -> log.info(c))
                    .flatMap(res1 -> client.get().uri(URL2, res1).exchange())            //Mono<ClientResponse>
                    .flatMap(c -> c.bodyToMono(String.class))                            //Mono<String>
                    .doOnNext(c -> log.info(c))
                    .flatMap(res2 -> Mono.fromCompletionStage(myService.work(res2)))     //completable<String> -> mono<String>
                    .doOnNext(c -> log.info(c))
                    ;

            //return 시 모노면 그때 subscribe 수행함
            //mono subscribe 를 spring이 실행해줌
            return body;
        }

    }

    @Service
    public static class MyService{
        //또다른 스래드 비동기 작업 시 async
        @Async
        public CompletableFuture<String> work(String req){
            return CompletableFuture.completedFuture(req + "/asyncwork");
        }
    }

    public static void main(String[] args) {
        System.setProperty("reactor.netty.ioWorkerCount", "1");
        System.setProperty("reactor.netty.pool.maxConnections", "2000");
        SpringApplication.run(Application.class, args);
    }
}

참고로 위 코드는 아래 코드를 수정한 것이다.

소스를 살펴보면 mono의 map과 flatmap의 사용 차이를 느낄 수 있을 것이다.

@GetMapping("/rest")
public Mono<String> rest(int idx) {
    Mono<ClientResponse> r = client.get()
            .uri(URL1, idx)
            .exchange();

    Mono<String> body = r
            .flatMap(clientResponse -> clientResponse.bodyToMono(String.class))  //Mono<String>
            .flatMap(res1 -> client.get().uri(URL2, res1).exchange())            //Mono<ClientResponse>
            .flatMap(c -> c.bodyToMono(String.class))                            //Mono<String>
            .map(res2 -> myService.work(res2))
            ;

    return body;
}
////////////////////////

@Service
public static class MyService{
    public String work(String req){
        log.info("myservice {}" , req);
        return req + "/asyncwork";
    }
}

 

map과 flatmap의 차이는 아래 링크에서 자세히 확인 가능하다. 스트림으로서의 차이는 이해했는데 동기/비동기에 대해서는 아직 잘 모르겠다.

https://www.geeksforgeeks.org/difference-between-map-and-flatmap-in-java-stream/

 

Difference Between map() And flatMap() In Java Stream - GeeksforGeeks

A Computer Science portal for geeks. It contains well written, well thought and well explained computer science and programming articles, quizzes and practice/competitive programming/company interview Questions.

www.geeksforgeeks.org

https://luvstudy.tistory.com/95

 

Reactor map, flatMap method는 언제 써야할까?

webflux로 서비스를 만들어보면서 map과 flatMap을 언제 써야 할지 헷갈릴 때가 있어 공부한 내용을 정리함. map과 flatMap은 둘 다 스트림의 중간에 값을 변환해주는 역할을 한다. map은 1 : 1로 반환을 보

luvstudy.tistory.com

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[reactive] 10. Flux  (0) 2022.03.25
[reactive] 9. Mono  (0) 2022.03.23
[reactive] 7. CompletableFuture  (0) 2022.03.21
[reactive] 6. refactoring  (0) 2022.03.21
[reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet  (0) 2022.03.18
반응형

이전 글: 2022.03.21 - [개발/reactive] - [reactive] 6. refactoring

 

[reactive] 6. refactoring

이전 글: 2022.03.18 - [개발/reactive] - [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet 오늘은 5강을 이어서 학습한다. 1. default, block..

bangpurin.tistory.com

 

 

CompletableFuture

  • 비동기 시작 supplyAsync vs runAsync
    • supplyAsync: 파라미터로 supplier 인터페이스 받고 반환 값(CompletableFuture <T>) 존재
    • runAsync: 파라미터로 runnable 인터페이스 받고 반환 값없음
  • 채이닝, 후속작업 thenApply vs thenAccept vs thenRun vs thenCompose vs thenCombine
    • thenApply: 데이터를 포함하는 future 반환(CompletableFuture <U>)
    • thenAccept: 파라미터로 Comsumer 받고 반환 값없음 (CompletableFuture <Void>)
    • thenRun: 파라미터로 runnable 받고 반환 값없음
    • thenCompose: 앞 단계의 CompletableFuture을 주고받고 하면서 순차적으로 연결 가능
    • thenCombine: 전혀 다른 CompletableFuture을 첫 인자로 받고, 결과를 연산하는 bifunction을 두 번째 인자로 받음
  • 데이터 가져오기(blocking) get vs join
    • get: checkedException 던지기 때문에 try/catch 처리 필요
    • join:  uncheckedException 던짐; 예외처리가 내부적
  • 감싸기 completedFuture: 이미 완료된 작업이나 정적인 값을 CompletableFuture로 감쌀 때
  • 예외 발생 시 exceptionally vs handle
    • exceptionally: 예외 발생 시 해당 예외를 받아서 처리 가능
    • handle: s, e를 다 받는 bifunction 구현체
  • 별도의 스레드 풀에서 작업 시 ~async 메서드
    • 다른 스레드가 후속 작업을 진행
    • thenApplyAsync
  • 여러 CompletableFuture을 병렬로 실행하고 모든 프로세스가 끝나길 기다렸다가(blocking) 처리하려면 allOf

 

연습 코드

   ExecutorService es = Executors.newFixedThreadPool(10);
    //Future 비동기 작업 결과를 담고있는 오브젝트 get -> blocking
    //listenable future 콜백; 완료 시점에
    //CompletableFuture 비동기 작업의 결과를 간단하게 만들 수
    //리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 장점
    //병렬성과 동시성에서 CP가 의미있는데, 여러 cpu core 사이에 지연실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능

    //completion station? stage? 장점은 코어성능 20% 더 효율적임
    //then ~ 이전의 스레드를 사용
    //then~async 하면 정책에 따라 새로운 스래드 할당
    CompletableFuture
            .supplyAsync(() -> {
                log.info("run");
                //if(1==1) throw new RuntimeException(); //exception으로 감
                return 1; //생성
            })
            .thenCompose(s -> { //compose 하면 completedfutre의 값만 넘김
                log.info("then {}", s);
                return CompletableFuture.completedFuture(s + 1); //받아서 작업하고 넘기고
            })
            .thenApply(s3 -> {
                log.info("then3 {}", s3);
                return s3 * 10; //받아서 작업하고 넘기고
            })
            .exceptionally(e -> -10) //예외발생하면 복구할 때 사용가능
            .thenAcceptAsync(s2 -> log.info("thenn2 {}", s2), es) //받아서 끝 어떤 스래드풀인지 알려줘야
    ;
    log.info("exit");

 

이전 시간에 작성한 코드를 CompletableFuture 방식으로 바꿔본다. 수정된 부분 위주로 작성한다.

///before
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();

    ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
    f1.addCallback(s -> {
        ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
        f2.addCallback(s2 -> {
            ListenableFuture<String> f3 = myService.work(s2.getBody());
            f3.addCallback(s3 -> {
                dr.setResult(s3);
            }, e3 -> {
                dr.setErrorResult(e3.getMessage());
                    }
            );
        }, e2-> {
            dr.setErrorResult(e2.getMessage());
        });
    }, e-> {
        //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
        dr.setErrorResult(e.getMessage());
    });
    return dr;
}
///after
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();

    toCF(rt.getForEntity(URL1, String.class, "f1" + idx))
            .thenCompose(s -> {
                if(1==1) throw new RuntimeException("ERROR");
                return toCF(rt.getForEntity(URL2, String.class, s.getBody()));
            })
          //  .thenCompose(s2 -> toCF(myService.work(s2.getBody())))
            .thenApplyAsync(s2 -> myService.work(s2.getBody()))
            .thenAccept(s3 -> dr.setResult(s3))
            .exceptionally(e -> {
                dr.setErrorResult(e.getMessage());
                return null;
            })
    ;

    return dr;
}

//lf -> cf 변환
<T> CompletableFuture<T> toCF(ListenableFuture<T> lf){
    CompletableFuture<T> cf = new CompletableFuture<T>(); //작업의 결과를 나타내는거지 비동기 작업자체는 아님
    lf.addCallback(s -> {cf.complete(s);}, e -> {cf.completeExceptionally(e);});
    return cf;
}

////
@Service
public static class MyService{
    //@Async //cf를 쓴다면 동기로 하고 위에서 적용하는것도 방법
    public String work(String req){
        log.info("myservice {}" , req);
      //  return new AsyncResult<String>(req + "/asyncwork");
        return req + "/asyncwork";
    }
}

참고

https://brunch.co.kr/@springboot/267

 

CompletableFuture

자바 비동기 프로그래밍을 위한 CompletableFuture 검토 | 필자는 최근에 CompletableFuture 를 사용해서 개발을 진행하였는데, CompletableFuture는 작년에 한번 사용한 이후로는 실무에서 사용할 일이 거의

brunch.co.kr

https://wbluke.tistory.com/50

 

CompletableFuture 톺아보기

CompletableFuture 학습 테스트 안녕하세요! 이번 포스팅에서는 학습 테스트를 통해 CompletableFuture를 알아보는 시간을 가져보려고 합니다. 모든 코드는 GitHub에 있으니 참고하시면 됩니다. CompletableFutur

wbluke.tistory.com

 

728x90
반응형
반응형

이전 글: 2022.03.18 - [개발/reactive] - [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet

 

[reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet

오늘은 5강을 이어서 학습한다. 1. default, blocking request 아래 그림처럼 2초씩 지연이 있는 /service를 요청하는 /rest를 100건 rest template로 요청한다. /rest의 스래드가 한 개라서 2초씩 대기한다. @Sl..

bangpurin.tistory.com

 

 

이전 글의 콜백 지옥을 새로운 클래스 선언을 통해 리팩토링해본다.

//////////////before
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
f1.addCallback(s -> {
    ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
    f2.addCallback(s2 -> {
        ListenableFuture<String> f3 = myService.work(s2.getBody());
        f3.addCallback(s3 -> {
            dr.setResult(s3);
        }, e3 -> {
            dr.setErrorResult(e3.getMessage());
                }
        );
    }, e2-> {
        dr.setErrorResult(e2.getMessage());
    });
}, e-> {
    //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
    dr.setErrorResult(e.getMessage());
});

///////////after
Completion
    //최초 실행
    .from(rt.getForEntity(URL1, String.class, "f1" + idx))
    //두번재 받아서 작업하고 return 해야함
    .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
    //어디서든 에러가 나면
    .andError(e -> dr.setErrorResult(e.toString()))
    //수행만하고 끝내는 아이 consumer
    .andAccept(s -> dr.setResult(s.getBody()));
public static class AcceptCompletion extends Completion{
    Consumer<ResponseEntity<String>> con;
    public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
        this.con = con;
    }

    @Override
    void run(ResponseEntity<String> val) {
        con.accept(val);
    }
}

public static class ErrorCompletion extends Completion{
    Consumer<Throwable> econ;
    //listenablefuture 실패 시 받는 throwable 처리
    public ErrorCompletion(Consumer<Throwable> econ) {
        this.econ = econ;
    }

    @Override
    void run(ResponseEntity<String> val) {
        //정상적일때 실행되면 안됨; 패스
        if(next != null){
            next.run(val);
        }
    }

    @Override
    void error(Throwable e) {
        econ.accept(e);
    }
}

public static class ApplyCompletion extends Completion{
    Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
    public ApplyCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
        this.fn = fn;
    }

    @Override
    void run(ResponseEntity<String> val) {
        ListenableFuture<ResponseEntity<String>> lf = fn.apply(val);
        lf.addCallback(s -> complete(s), e-> error(e));
    }
}

public static class Completion{
    Completion next;

    public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
        Completion c = new Completion();

        lf.addCallback(s -> {
            c.complete(s);
        }, e -> {
            c.error(e);
        });
        return c;
    }

    void error(Throwable e) {
        if(next != null){
            next.error(e);
        }
    }

    void complete(ResponseEntity<String> s) {
        if(next != null){
            next.run(s);
        }
    }

    void run(ResponseEntity<String> val) {
    }

    public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fun){
        Completion c = new ApplyCompletion(fun);
        this.next = c;
        return c;
    }

    public void andAccept(Consumer<ResponseEntity<String>> con){
        Completion c = new AcceptCompletion(con);
        this.next = c;
    }

    public Completion andError(Consumer<Throwable> econ){
        Completion c = new ErrorCompletion(econ);
        this.next = c;
        return c;
    }
}

위와 같이 만들자니 세 번째 콜을 아래와 같이 추가할 때 형 변환 에러가 난다.

  .andApply(s -> myService.work(s.getBody)

andApply는 request로 ResponseEntity<String>을 주고 response로 ListenableFuture<ResponseEntity<String>>를 받아야 하지만 work 함수는 그렇지 않다.

그러면 저 request/response를 만족하는 같은 내용의 함수를 또 작성해야 하는가?

뭐 그래도 작동은 하겠지만, 매번 함수를 작성하는 것은 깔끔하지 못하기에 generic을 통해 해결한다.

public static class AcceptCompletion<S> extends Completion<S, Void>{
    Consumer<S> con;
    public AcceptCompletion(Consumer<S> con) {
        this.con = con;
    }

    @Override
    void run(S val) {
        con.accept(val);
    }
}

public static class ErrorCompletion<T> extends Completion<T, T>{
    Consumer<Throwable> econ;
    //listenablefuture 실패 시 받는 throwable 처리
    public ErrorCompletion(Consumer<Throwable> econ) {
        this.econ = econ;
    }

    @Override
    void run(T val) {
        //정상적일때 실행되면 안됨; 패스
        if(next != null){
            next.run(val);
        }
    }

    @Override
    void error(Throwable e) {
        //계속 타고온 에러를 처리
        econ.accept(e);
    }
}

public static class ApplyCompletion<S, T> extends Completion<S, T>{
    Function<S, ListenableFuture<T>> fn;
    public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
        this.fn = fn;
    }

    @Override
    void run(S val) {
        ListenableFuture<T> lf = fn.apply(val);
        lf.addCallback(s -> complete(s), e-> error(e));
    }
}

public static class Completion<S, T>{
    Completion next;

    public static <S, T> Completion<S,T> from(ListenableFuture<T> lf) {
        Completion<S, T> c = new Completion<>();

        lf.addCallback(s -> {
            c.complete(s);
        }, e -> {
            c.error(e);
        });
        return c;
    }

    void error(Throwable e) {
        if(next != null){
            next.error(e);
        }
    }

    void complete(T s) {
        if(next != null){
            next.run(s);
        }
    }

    void run(S val) {
    }

    public <V> Completion<T, V> andApply(Function<T, ListenableFuture<V>> fun){
        Completion<T, V> c = new ApplyCompletion(fun);
        this.next = c;
        return c;
    }

    public void andAccept(Consumer<T> con){
        Completion<T, Void> c = new AcceptCompletion(con);
        this.next = c;
    }

    public Completion<T, T> andError(Consumer<Throwable> econ){
        Completion<T, T> c = new ErrorCompletion<>(econ);
        this.next = c;
        return c;
    }
}

 

728x90
반응형

+ Recent posts