스프링 프레임워크에서는 기본적으로 Unchecked Exception(런타임 예외)이 발생하면 트랜잭션을 롤백합니다.
반면, Checked Exception(체크드 예외)은 기본적으로 트랜잭션을 롤백하지 않습니다. 이는 Checked Exception이 비즈니스 로직 내에서 예외적인 상황으로 처리될 수 있기 때문입니다.
checked exception에서도 롤백이 필요하다면 rollbackFor을 활용하면 된다.
@Transactional(rollbackFor = IOException.class)
public void performTransaction() throws IOException {
repository.save(new Entity());
// Checked Exception 발생
if (true) {
throw new IOException("Checked Exception");
}
repository.save(new Entity());
}
특정 Unchecked Exception에 대해서는 롤백을 원하지 않을 때 noRollbackFor 속성을 사용합니다.
왜 기본적으로 Unchecked Exception에 대해서만 롤백할까?
비즈니스 로직과의 분리: Checked Exception은 비즈니스 로직 내에서 발생할 수 있는 예외적인 상황을 나타내며, 이를 비즈니스 로직의 일부로 처리하는 경우가 많습니다. 반면, Unchecked Exception은 예외적 상황이 아니라 코드의 오류나 논리적 문제를 나타내므로 트랜잭션의 롤백이 필요합니다.
복구 가능성: Checked Exception은 복구 가능한 예외로 간주되어, 개발자가 예외를 처리하여 정상적인 흐름으로 돌아갈 수 있다고 판단할 수 있습니다. Unchecked Exception은 복구가 불가능한 예외로 간주되어 트랜잭션을 롤백하는 것이 합리적입니다.
spring mvc 기반 spring-web의 restTemplate으로 api를 요청하고 있던 와중, restTemplate가 deprecated 된다는 말을 듣고, 대체제인 webClient를 사용해보려고 공부를 하다 보니 webflux의 영역에 들어와 버렸다. 물론 webClient도 sync call을 지원하지만 수많은 api 콜을 비동기로 하면 자연스레 내 api의 속도도 빨라질 것이 아닌가? 위기를 기회로 전환하며 새로운 아키텍처를 익히려고 spring-webflux를 추가하였다.
그런데 spring reactive강의를 듣던 도중, 두 dependency는 spring context의 혼란을 야기하므로 같이 사용하면 안 된다는 말을 들었다. 오래된 강의긴 했지만 나름 스프링 저명인사가 말한 것이기에 안되리라 생각하고 좌절하며 관련 내용을 더 찾아보기로 했다.
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0.. 1) result.
Mono와 Flux의 차이를 검색해보면, mono는 하나의 값을 return할return 할 때 쓰이는 반면 flux는 n개의 값을 return 할 때 쓰인다고 나온다. 그런데 사실 두 차이를, 어쩌면 그 의미를 잘 모르겠다. 아래의 예시에서는 같은 결과를 mono/flux로 각각 작성 가능하다는 것을 보여준다. 차이는 flux를 쓰면 flux라이브러리의 다양한 오퍼레이션 함수를 사용할 수 있다고 하는데, 데이터를 수정 없이 그대로 내린다면 mono나 flux나 그게 그거 아닌가..?
@GetMapping("/event/{id}")
Mono<List<Event>> hello(@PathVariable long id){
//return Mono.just(new Event(id, "event "+ id));
List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
//데이터를 컬랙션으로 묶어서 다루고, 각 데이터 다루거나 편집할 때 힘듦
return Mono.just(list);
}
@GetMapping(value = "/events")
Flux<Event> events(){
List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
//data stream -> .map 등 사용 가능
return Flux.fromIterable(list);
//같은 결과
//return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}
@Data
@AllArgsConstructor
public static class Event{
long id;
String value;
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
//데이터생성 using stream
//Stream<Event> s = Stream.generate(() -> new Event(System.currentTimeMillis(), "val"));
//data stream -> .map 등 사용 가능; 위와 동일한 결과
//return Flux.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "val")).limit(10));
return Flux //server sent event
.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "val")))
.delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
.take(10);//10개의 request를 보내고 다 오면 cancel 처리
}
stream을 드러내고 flux 기능만을 사용하면 아래와 같겠다.
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
return Flux //server sent event
//.range(1, 10) 앞에서 미리 정하고 들어갈 수도 있음;
//.<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value")))//데이터를 계속 흘러서 보내는 역할; type hint 줘야함
//상태값을 바꿔서 리턴; 초기상태, 상태 바꿔주는 함수 그담 상태 리턴
.<Event, Long>generate(()->1L, (id, sink) -> {
sink.next(new Event(id, "value" + id));//값을 받아 이벤트를 생성해서 싱크로 보내고
return id+1;//다음 상태 리턴; id로 들어가겟지
})
.delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
.take(10);//10개의 request를 보내고 다 오면 cancel; 뒤에서 끊는 개념
}
flux의 zip을 활용하면 아래 두 가지 방법이 가능하다.
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
Flux<Event> es = Flux //server sent event
//상태값을 바꿔서 리턴; 초기상태, 상태 바꿔주는 함수 그담 상태 리턴
.<Event, Long>generate(()->1L, (id, sink) -> {
sink.next(new Event(id, "value" + id));//값을 받아 이벤트를 생성해서 싱크로 보내고
return id+1;//다음 상태 리턴; id로 들어가겟지
})
//.delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
//.take(10);//10개의 request를 보내고 다 오면 cancel; 뒤에서 끊는 개념
;
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
//결합해서 delay의 효과를 볼 수 있음
return Flux.zip(es, interval).map(tu -> tu.getT1());
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
Flux<String> es = Flux.generate(sink -> sink.next("value"));
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); //0부터 시작
//event 조합
return Flux.zip(es, interval).map(tu -> new Event(tu.getT2(), tu.getT1())).take(10);
}
테스트 자동화 툴인 ReadyAPI(soupUI)에는 script 언어로 groovy가 쓰인다.
아무래도 원하는 바를 쉽게 하려면 groovy를 조금이라도 아는 게 중요, 지인짜 기본적인 것만 언급한다.
코드 끝에 세미콜론 있어도 되고 없어도 되고..
주석
//한줄
/* 여러줄
애해해
ㅇㅇㅇ*/
로그
//()가 있어도 되고
log.info("Result :" + c);
//괄호를 생략해도 가능
log.info "soapUI script"
log.info "aa" + c
변수 선언 def
string to json
import groovy.json.JsonSlurper
def resResponse = "{'a':'bb', 'n':3}"
//string 형태의 json을 진짜 json으로 바꿔주는 lib.function
def json = new JsonSlurper().parseText(restResponse)
//이후에는 json 접근하 듯 접근 가능
log.info json.a
//결과: bb
나머지는 java랑 비슷, 단 stream 쓸 때 {it -> } 로 내부 변수명이 it로 지정되어 있고 별도의 함수가 있는 듯 하다.
//publisher -> publisher ... -> subscriber 시 실행
m = mono
.just(genHello()) //미리 준비하는.. 그 안의 함수도 미리 실행해서 준비해둠 genHello -> mono
.fromSupplier(()-> genHello()) //mono -> genHello 람다식을 던져야 함수가 그 때 실행해
//(==) 람다식의 의미는 아래와 같음
.fromSupplier(new Supplier<String>(){
public String get(){
return genHello()
}
})
m.subscribe();
log.info("pos2")
return m; //spring 이 subscribe
위 처럼 subscribe가 중간에 있다면 어떻게 실행될까 : 두 번 실행됨 subscribe -> pos2 -> subscribe
같은 내용이라 두 번 실행을 막고 싶으면, m.subscribe 내용을 따로 담고 그 내용을 Mono.just로 감싸서 return 하면 됨
모노 컨테이너 안의 데이터를 빼야겠어 = mono.block block 내부에서 subscribe가 실행되면서 내용을 꺼냄
hot publisher vs cold publisher
cold는 replay 데이터를 처음부터 다시 생성해줌
hot은 구독하는 시점부터 데이터를 줌; 처음부터 주는거 아님
subscribe vs block?
subscribe은 스케줄러(쓰레드 모드) 설정에 따라서 블록을 할 수도, 안 할 수도 있음. 결과는 람다식으로 받기 때문에 반드시 블록하지 않아도 됨.
반면에 block은 결과를 람다식에서 콜백으로 받는 게 아니라, 값을 꺼내서 변수 등으로 넘겨야하기 때문에 해당 쓰레드는 블록됨.
supplyAsync: 파라미터로 supplier 인터페이스 받고 반환 값(CompletableFuture <T>) 존재
runAsync: 파라미터로 runnable 인터페이스 받고 반환 값없음
채이닝, 후속작업 thenApply vs thenAccept vs thenRun vs thenCompose vs thenCombine
thenApply: 데이터를 포함하는 future 반환(CompletableFuture <U>)
thenAccept: 파라미터로 Comsumer 받고 반환 값없음 (CompletableFuture <Void>)
thenRun: 파라미터로 runnable 받고 반환 값없음
thenCompose: 앞 단계의 CompletableFuture을 주고받고 하면서 순차적으로 연결 가능
thenCombine: 전혀 다른 CompletableFuture을 첫 인자로 받고, 결과를 연산하는 bifunction을 두 번째 인자로 받음
데이터 가져오기(blocking) get vs join
get: checkedException 던지기 때문에 try/catch 처리 필요
join: uncheckedException 던짐; 예외처리가 내부적
감싸기 completedFuture: 이미 완료된 작업이나 정적인 값을 CompletableFuture로 감쌀 때
예외 발생 시 exceptionally vs handle
exceptionally: 예외 발생 시 해당 예외를 받아서 처리 가능
handle: s, e를 다 받는 bifunction 구현체
별도의 스레드 풀에서 작업 시 ~async 메서드
다른 스레드가 후속 작업을 진행
thenApplyAsync
여러 CompletableFuture을 병렬로 실행하고 모든 프로세스가 끝나길 기다렸다가(blocking) 처리하려면 allOf
연습 코드
ExecutorService es = Executors.newFixedThreadPool(10);
//Future 비동기 작업 결과를 담고있는 오브젝트 get -> blocking
//listenable future 콜백; 완료 시점에
//CompletableFuture 비동기 작업의 결과를 간단하게 만들 수
//리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 장점
//병렬성과 동시성에서 CP가 의미있는데, 여러 cpu core 사이에 지연실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능
//completion station? stage? 장점은 코어성능 20% 더 효율적임
//then ~ 이전의 스레드를 사용
//then~async 하면 정책에 따라 새로운 스래드 할당
CompletableFuture
.supplyAsync(() -> {
log.info("run");
//if(1==1) throw new RuntimeException(); //exception으로 감
return 1; //생성
})
.thenCompose(s -> { //compose 하면 completedfutre의 값만 넘김
log.info("then {}", s);
return CompletableFuture.completedFuture(s + 1); //받아서 작업하고 넘기고
})
.thenApply(s3 -> {
log.info("then3 {}", s3);
return s3 * 10; //받아서 작업하고 넘기고
})
.exceptionally(e -> -10) //예외발생하면 복구할 때 사용가능
.thenAcceptAsync(s2 -> log.info("thenn2 {}", s2), es) //받아서 끝 어떤 스래드풀인지 알려줘야
;
log.info("exit");
이전 시간에 작성한 코드를 CompletableFuture 방식으로 바꿔본다. 수정된 부분 위주로 작성한다.
아래 그림처럼 2초씩 지연이 있는 /service를 요청하는 /rest를 100건 rest template로 요청한다. /rest의 스래드가 한 개라서 2초씩 대기한다.
@Slf4j
public class LoadTest {
static AtomicInteger cnt = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate rt = new RestTemplate();
String url = "http://localhost:8080/rest?idx={idx}";
CyclicBarrier barrier = new CyclicBarrier(101);
for(int i = 0; i <100; i++){
es.submit(() -> {
int idx = cnt.addAndGet(1);
//blocking till await을 만난 숫자가 100에 도달할 때 까지 ; 그 순간에 블라킹이 한번에 풀려; 스레드 동기화 가능; 100개가 동시에 시작
//throw exception하기 때문에 callable 구현 필요
barrier.await();
log.info("thread {}", idx);
StopWatch s1 = new StopWatch();
s1.start();
String res = rt.getForObject(url, String.class, idx);
s1.stop();
log.info("elapsed:{} -> {} / {}", idx, s1.getTotalTimeSeconds(), res);
return null; //callable 구현한 인터페이스라고 생각함
});
}
barrier.await();
StopWatch sw = new StopWatch();
sw.start();
//graceful shutdown이라서 먼저 걸고 기다림
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
sw.stop();
log.info("total time {}", sw.getTotalTimeSeconds());
}
}
@SpringBootApplication
public class Application {
@RestController
public static class MyController{
RestTemplate rt = new RestTemplate();
//thread 1개
//2초씩 대기; 톰캣 큐에 대기
@GetMapping("/rest")
public String rest(int idx){
//RestTemplate blocking; 응답올때까지 대기; 스레드가 더 필요로하는 요청이라면 홀딩
String res = rt.getForObject("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
return res;
}
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
//설정
server.tomcat.threads.max=1
spring.task.execution.pool.core-size=100
@SpringBootApplication
public class RemoteService {
@RestController
public static class MyController{
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(2_000);
return req + "/service";
}
}
public static void main(String[] args) {
//application.properties 오버라이드하는 설정
System.setProperty("server.port", "8081");
System.setProperty("server.tomcat.threads.max", "1000");
SpringApplication.run(RemoteService.class, args);
}
}
...
12:23:06.118 [pool-1-thread-39] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:06.119 [pool-1-thread-39] INFO toby.live.lecture4.LoadTest - elapsed:39 -> 8.463001595 / hello39/service
12:23:08.127 [pool-1-thread-30] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:23:08.127 [pool-1-thread-30] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:08.127 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 10.472331585 / hello30/service
12:23:10.139 [pool-1-thread-34] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:23:10.139 [pool-1-thread-34] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:10.140 [pool-1-thread-34] INFO toby.live.lecture4.LoadTest - elapsed:34 -> 12.485099461 / hello34/service
...
2초 이상이 걸리는 100건의 작업이고 응답을 기다리는 동안은 블로킹되기 때문에 총 200초가량 소요된다. 허나 복잡한 로직도 아니고, 사실 cpu는 놀고 있을 것이다.. 이를 비동기로 바꿔보자.
@RestController
public static class MyController{
AsyncRestTemplate rt = new AsyncRestTemplate();
//thread 1개
@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx){
//호출을 비동기로; 바로 스레드 리턴, 응답이 오면 콜백은 스프링이 알아서; 응답이 오면 이 컨트롤러의 응답이라 생각하고 처리함; 콜백을 처리할 필요 없음
//비동기를 호출하기 위해 스래드를 백그라운드에 미리 만들어; 자바의 api를 사용해서; 겉으로는 한 스레드에서 처리한 것 처럼 보이지만 사실 서버의 자원을 사용한 것
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
return res;
}
}
...
12:42:49.442 [pool-1-thread-54] INFO toby.live.lecture4.LoadTest - elapsed:54 -> 2.516070823 / hello54/service
12:42:49.444 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:42:49.444 [pool-1-thread-12] INFO toby.live.lecture4.LoadTest - elapsed:12 -> 2.513689601 / hello12/service
12:42:49.444 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:42:49.445 [pool-1-thread-89] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:42:49.445 [pool-1-thread-89] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:42:49.445 [pool-1-thread-98] INFO toby.live.lecture4.LoadTest - elapsed:98 -> 2.518126106 / hello98/service
12:42:49.445 [pool-1-thread-89] INFO toby.live.lecture4.LoadTest - elapsed:89 -> 2.515622072 / hello89/service
12:42:49.446 [main] INFO toby.live.lecture4.LoadTest - total time 2.527531376
이렇게 실행하면 총 실행시간이 2초 남짓으로 마치 요청 한 건을 한 것과 비슷하게 나온다. 하지만 자바에서 제공하는 분석 툴인 jmc를 이용하여 /rest를 실행한 애플리케이션의 thread의 개수를 확인해보면..
서블랫 스래드는 하나만 만들어져야 하는데, 사실 뒤에서는 100개 이상의 스레드를 추가적으로 생성해서 요청하고 있음을 알 수 있다. 톰캣이 100개의 스레드를 만들었나 싶은데, 사실 톰캣 스래드는 사진에서 보는 것과 같이 하나고, 자바 api를 통해 백그라운드에서 100개 스레드를 만들어 놓고 실행하는 것이다. 즉 매번 서버 자원을 사용한다는 것인데, 이는 비효율적이다. 최소한의 자원을 사용해야 한다.
3. netty / nonblocking io를 사용하는 client lib 사용
/rest 부분의 rest template를 async rest template으로 변경하고 네티를 사용한다. 네티의 스레드를 1개로 설정한다.
@RestController
public static class MyController{
//Netty4ClientHttpRequestFactory spring 제공
//netty도 스레드 1개
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
//thread 1개
@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx) throws ExecutionException, InterruptedException {
log.info("/rest {}", idx);
//호출을 비동기로; 바로 스레드 리턴, 응답이 오면 콜백은 스프링이 알아서; 응답이 오면 이 컨트롤러의 응답이라 생각하고 처리함; 콜백을 처리할 필요 없음
//비동기를 호출하기 위해 스래드를 백그라운드에 미리 만들어; 자바의 api를 사용해서; 겉으로는 한 스레드에서 처리한 것 처럼 보이지만 사실 서버의 자원을 사용한 것
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
return res;
}
}
그리고 실행을 하면 처음에는 3초가 걸리고, 다시 또 실행을 하면.. 8초가 걸리고.. 다시 실행하면 timeout이 난다..
토비님과는 다른 결과.. 왜지ㅠ
...
15:13:45.147 [pool-1-thread-64] INFO toby.live.lecture4.LoadTest - elapsed:64 -> 3.230518378 / hello64/service
15:13:45.148 [pool-1-thread-82] INFO toby.live.lecture4.LoadTest - elapsed:82 -> 3.229926106 / hello82/service
15:13:45.148 [pool-1-thread-53] INFO toby.live.lecture4.LoadTest - elapsed:53 -> 3.229702047 / hello53/service
15:13:45.148 [pool-1-thread-62] INFO toby.live.lecture4.LoadTest - elapsed:62 -> 3.229505908 / hello62/service
15:13:45.149 [main] INFO toby.live.lecture4.LoadTest - total time 3.242286844
...
15:15:27.268 [pool-1-thread-60] INFO toby.live.lecture4.LoadTest - elapsed:60 -> 8.542415481 / hello60/service
15:15:27.268 [main] INFO toby.live.lecture4.LoadTest - total time 8.556960287
...
15:18:02.122 [pool-1-thread-26] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.124 [pool-1-thread-18] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.124 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.126 [pool-1-thread-12] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.129 [main] INFO toby.live.lecture4.LoadTest - total time 30.97585281
타임아웃이 나는 현상을 지켜보니 아래와 같은 순서로 진행되었다.
LoadTest -> /rest로 100건 요청
/rest에서 100건 요청받고 -> /service로 요청해야 하는데 /service 쪽 로그에 아무것도 안 찍힘
1분 뒤에 /rest에서 async request timeout exception 걸려서 503 return
그 후에 /service가 요청을 받아서 처리 하지만 /rest에서는 이미 exception 처리돼서 무쓸모
왜 /rest는 요청을 바로 받는데 /service로 바로 안 보내고 1분 후에 한 번에 보내는 걸까.. block 되어 있는 건가..
/rest가 1개의 스레드에서 도는데 어디선가 블라킹이 있어서 안 되는 것 같은데, 왠지 모르겠다. 느낌은 저 시대와 지금의 기본적인 풀 세팅 정보가 달라서 그럴 것 같다.
어쨌건 스레드를 100개씩이나 더 생성하지 않는 것은 확인했다.
자꾸 타임아웃이 나지만 우선은 진행한다.
ListenableFuture로 응답을 그대로 내리면 응답을 꺼내서 수정할 수가 없다. 이때 사용 가능한 것이 DeferredResult이다.
ListenableFuture에 콜백을 달고 DeferredResult에 setResult를 하면 응답을 수정할 수 있다
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) throws ExecutionException, InterruptedException {
log.info("/rest {}", idx);
DeferredResult<String> dr= new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
//res.get -> blocking 으로 바뀌어버림 의미없음
//callback이 실행되는거지 리턴을 주는게 아님; 응답을 받고 변형하려면? deferredReuslt!
res.addCallback(s -> {
dr.setResult(s.getBody() + "/work"); //응답 변형
}, e-> {
//throw 하면 어디서 에러가 난지 알 수 없어 비동기라
dr.setErrorResult(e.getMessage());
});
return dr;
}
중첩 콜을 원한다면..
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) throws ExecutionException, InterruptedException {
log.info("/rest {}", idx);
DeferredResult<String> dr= new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
res.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2-> {
dr.setResult(s2.getBody());
}, e2-> {
dr.setErrorResult(e2.getMessage());
});
}, e-> {
//throw 하면 어디서 에러가 난지 알 수 없어 비동기라
dr.setErrorResult(e.getMessage());
});
return dr;
}
2초짜리 콜을 2번 중첩해서(/service2는 /service의 카피본이다) 4초가량이 걸린다. (하지만 역시 나는 위와 같은 현상이 계속되어서 안되었다..)
3번 중첩한다면..
@Slf4j
@EnableAsync
@SpringBootApplication
public class Application {
@RestController
public static class MyController{
//Netty4ClientHttpRequestFactory spring 제공
//netty도 스레드 1개
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";
//thread 1개
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
log.info("/rest {}", idx);
DeferredResult<String> dr= new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e3 -> {
dr.setErrorResult(e3.getMessage());
}
);
}, e2-> {
dr.setErrorResult(e2.getMessage());
});
}, e-> {
dr.setErrorResult(e.getMessage());
});
return dr;
}
}
@Service
public static class MyService{
@Async
public ListenableFuture<String> work(String req){
return new AsyncResult<String>(req + "/asyncwork");
}
}
@Bean
ThreadPoolTaskExecutor myPool(){
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1); //queue까지 다 차면 맥스
te.initialize();
return te;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
주의해야 하는 것은 어디서든 다 async, 비동기가 되게끔 작성해야 한다는 것, 아니면 순식간에 동기로 바뀐다.