반응형

이전 글: 2022.03.30 - [개발/reactive] - [webflux] 실무투입고민

 

[webflux] 실무투입고민

이전 글: 2022.03.25 - [개발/reactive] - [spring] spring-web and spring-webflux [spring] spring-web and spring-webflux spring mvc 기반 spring-web의 restTemplate으로 api를 요청하고 있던 와중, restTemp..

bangpurin.tistory.com

 

현재 실무에서 spring-web + spring-webflux(webClient)인 1번 프로젝트spring-webflux only인 2번 프로젝트를 동시에 작업하고 있다.

비동기를 사용하는 부분은 주로 외부 api call 인 부분이며, 최종 res는 동기로 내려주게 사용 중이다. 

비슷한 로직이어도 두 프로젝트에서 다르게 작동하는 부분이 있어 정리해본다.


1. 비동기로 함수를 실행시키기 위해 block()을 사용

block()/blockFirst()/blockLast() are blocking, which is not supported in thread

1번 프로젝트에서는 spring-mvc 기반이기 때문에 Mono.zip.block()이 가능했지만, 2번 프로젝트에서는 위와 같은 에러가 뜨면서 작동하지 않는다.

2번 프로젝트는 thread blocking이 지원되지 않기 때문에 에러가 난다. 해결방안을 찾아봐도 spring-web을 추가하거나 다른 걸 쓰라고 한다.

대안으로는 Mono.zip.toFuture.get()으로 미래에 완료가 되면 꺼내오는 방식을 사용하면 된다.

 

반대로 1번 프로젝트의 block()을. toFuture(). get()으로 수정한다면?

-> 정상 작동된다.

 

그렇다면 toFuture.get 은 blocking 일까? 왜냐면 두 함수의 동작이 똑같아 보이기 때문이다.

둘 다 subscribes immediately 하기 때문에 그래 보인다!

block()은 스레드를 진짜 블로킹하지만 toFuture은 completableFuture을 리턴하여 작업이 끝날 때까지 기다렸다가 get으로 바디를 꺼내 주는데 즉시 실행하기에 블로킹으로 보인다는.. 것..이라는 것

즉 toFuture가 스레드 관리에 더 flexible 하다.

https://stackoverflow.com/questions/58504527/is-mono-tofuture-blocking

 

Is Mono.toFuture() blocking?

From the Official Documentation of Mono#block() it is said that: Subscribe to this Mono and block indefinitely until a next signal is received. Returns that value, or null if the Mono completes ...

stackoverflow.com


Nothing happens until you subscribe… until something does

2. webflux기반인 2번 프로젝트에서는 controller단의 Mono return만으로도 subscribe가 작동한다.

spring-web기반, 혹은 webflux의 서비스나 내부의 함수에서 작동하는 게 아니었다

https://stackoverflow.com/questions/56487429/who-calls-subscribe-on-flux-or-mono-in-reactive-webapplication

 

who calls subscribe on Flux or Mono in reactive webapplication

I am looking at some examples of reactive web applications and i am seeing them like this @RequestMapping(value = "/{id}", method = RequestMethod.GET) @ResponseBody public Mono<Person>

stackoverflow.com

https://github.com/reactor/reactor-netty/blob/db27625064fc78f8374c1ef0af3160ec3ae979f4/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java#L962

 

GitHub - reactor/reactor-netty: TCP/HTTP/UDP/QUIC client/server with Reactor over Netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty - GitHub - reactor/reactor-netty: TCP/HTTP/UDP/QUIC client/server with Reactor over Netty

github.com


Nothing happens until you subscribe… until something does

 

3. webflux 기반인 프로젝트에서 애매하게. toFuture(). get()을 결과에 써놓고 아무 데서도 subscribe를 하지 않는다면.. (메인 스레드는 subscribe가 올 때까지 무한) pending이 걸린다..

//컨트롤러
public BaseResponse<UserDetailRes> getUserDetailInfo(@RequestParam String memberValue) {
    return getUserDetailInfoSS(memberValue).toFuture().get();
}

//서비스
public Mono<BaseResponse<UserDetailRes>> getUserDetailInfoSS(String memberValue){
	Mono<String> a = callA(); //webclient 호출
	Mono<Object> b = ...; //webclient 호출
	Mono<Object> c = ...; //webclient 호출

	return Mono.zip(a, b, c).flatMap();
}

여기서 엄청난 고민을 했었는데 두 가지 방법으로 해결할 수 있다.

첫 번째. Mono 함수에서 subscribeOn으로 다른 스레드에게 할당을 해서 해소한다. 근데 이게 온전한 해결방법인지는 모르겠다. 뭔가 얻어걸린듯한..

처음에는 subscribeOn의 추가만으로 정상 작동한다는 게 이해가 안 갔는데,, 다른 스레드가 구독을 한다는 의미로 받아들여지는 것 같다.

여기서 또 특이한 점은 모든 Mono 함수에 subscribeOn을 넣을 필요도 없고 맨 처음 함수에만 넣어도 된다. 심지어 zip에 넣어도 작동한다.

즉 위의 예시에서는 callA()의 함수만 넣어도 b, c도 동일한 스레드로 작동하기 때문에 두 번 넣을 필요가 없다.

단 a는 넣지 않고 b나 c에 넣으면 다시 작동이 안 된다..

형태가 맘에 안 든다면 zip에 넣어도 된다. 이는 해당 job 전체를 별도 스레드 할당하는 것이고 위는 그 api를 별도 스레드로 할당한다는 차이가 있다.

Schedulers.boundedElastic()옵션은 다른 논블로킹 작업들에게 영향 없이 하나의 싱글 스레드에서 블로킹 작업한다는 의미이다. 다른 옵션들도 많이 있다.

//다른 서비스
public Mono<String> callA() {
	return webclient.get()
            .uri()
            .retrieve()
            .bodyToMono(생략)
            .subscribeOn(Schedulers.boundedElastic()) //스레드 할당
	;
}

//서비스
//subscribeOn을 어디서 호출하는게 좋을지 고민이 필요하다
public Mono<BaseResponse<UserDetailRes>> getUserDetailInfoSS(String memberValue){
	Mono<String> a = callA(); //webclient 호출
	Mono<Object> b = callB().subscribeOn(Schedulers.boundedElastic()) //webclient 호출
	Mono<Object> c = ...; //webclient 호출

	return Mono.zip(a, b, c)
        .flatMap()
        .subscribeOn(Schedulers.boundedElastic())
    ;
}

 

두 번째. 아까 위에서 언급한, 'webflux의 경우 controller return에서 subscribe를 해준다'를 응용한다!

subscribeOn은 지우고(물론 별도 스레드를 할당해서 할 거면 둬야지) mono로 리턴하게 바꾼다.

//컨트롤러
public Mono<BaseResponse<UserDetailRes>> getUserDetailInfo(@RequestParam String memberValue) {
    return getUserDetailInfoSS(memberValue);
}

//서비스
public Mono<BaseResponse<UserDetailRes>> getUserDetailInfoSS(String memberValue){
	Mono<String> a = callA(); //webclient 호출
	Mono<Object> b = ...; //webclient 호출
	Mono<Object> c = ...; //webclient 호출

	return Mono.zip(a, b, c).flatMap();
}

이렇게 명료한 것을.. spring-web의 block의 늪에 빠져 큰 그림을 못 본 점 + 서비스에서 모노 리턴도 subscribe가 되는 거 아닌가 생각했던 안일한 생각으로 인해 돌고 돌아서 도착한 것 같다.

 


참고

https://projectreactor.io/docs/core/release/reference/#getting-started

 

Reactor 3 Reference Guide

10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 1

projectreactor.io

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[reactor] Sinks  (0) 2022.06.13
[webclient] 비슷한데 뭘 써야할지 모르겠는 것들  (0) 2022.04.01
[webflux] 실무투입고민  (0) 2022.03.30
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 10. Flux  (0) 2022.03.25

+ Recent posts