supplyAsync: 파라미터로 supplier 인터페이스 받고 반환 값(CompletableFuture <T>) 존재
runAsync: 파라미터로 runnable 인터페이스 받고 반환 값없음
채이닝, 후속작업 thenApply vs thenAccept vs thenRun vs thenCompose vs thenCombine
thenApply: 데이터를 포함하는 future 반환(CompletableFuture <U>)
thenAccept: 파라미터로 Comsumer 받고 반환 값없음 (CompletableFuture <Void>)
thenRun: 파라미터로 runnable 받고 반환 값없음
thenCompose: 앞 단계의 CompletableFuture을 주고받고 하면서 순차적으로 연결 가능
thenCombine: 전혀 다른 CompletableFuture을 첫 인자로 받고, 결과를 연산하는 bifunction을 두 번째 인자로 받음
데이터 가져오기(blocking) get vs join
get: checkedException 던지기 때문에 try/catch 처리 필요
join: uncheckedException 던짐; 예외처리가 내부적
감싸기 completedFuture: 이미 완료된 작업이나 정적인 값을 CompletableFuture로 감쌀 때
예외 발생 시 exceptionally vs handle
exceptionally: 예외 발생 시 해당 예외를 받아서 처리 가능
handle: s, e를 다 받는 bifunction 구현체
별도의 스레드 풀에서 작업 시 ~async 메서드
다른 스레드가 후속 작업을 진행
thenApplyAsync
여러 CompletableFuture을 병렬로 실행하고 모든 프로세스가 끝나길 기다렸다가(blocking) 처리하려면 allOf
연습 코드
ExecutorService es = Executors.newFixedThreadPool(10);
//Future 비동기 작업 결과를 담고있는 오브젝트 get -> blocking
//listenable future 콜백; 완료 시점에
//CompletableFuture 비동기 작업의 결과를 간단하게 만들 수
//리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 장점
//병렬성과 동시성에서 CP가 의미있는데, 여러 cpu core 사이에 지연실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능
//completion station? stage? 장점은 코어성능 20% 더 효율적임
//then ~ 이전의 스레드를 사용
//then~async 하면 정책에 따라 새로운 스래드 할당
CompletableFuture
.supplyAsync(() -> {
log.info("run");
//if(1==1) throw new RuntimeException(); //exception으로 감
return 1; //생성
})
.thenCompose(s -> { //compose 하면 completedfutre의 값만 넘김
log.info("then {}", s);
return CompletableFuture.completedFuture(s + 1); //받아서 작업하고 넘기고
})
.thenApply(s3 -> {
log.info("then3 {}", s3);
return s3 * 10; //받아서 작업하고 넘기고
})
.exceptionally(e -> -10) //예외발생하면 복구할 때 사용가능
.thenAcceptAsync(s2 -> log.info("thenn2 {}", s2), es) //받아서 끝 어떤 스래드풀인지 알려줘야
;
log.info("exit");
이전 시간에 작성한 코드를 CompletableFuture 방식으로 바꿔본다. 수정된 부분 위주로 작성한다.
아래 그림처럼 2초씩 지연이 있는 /service를 요청하는 /rest를 100건 rest template로 요청한다. /rest의 스래드가 한 개라서 2초씩 대기한다.
@Slf4j
public class LoadTest {
static AtomicInteger cnt = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate rt = new RestTemplate();
String url = "http://localhost:8080/rest?idx={idx}";
CyclicBarrier barrier = new CyclicBarrier(101);
for(int i = 0; i <100; i++){
es.submit(() -> {
int idx = cnt.addAndGet(1);
//blocking till await을 만난 숫자가 100에 도달할 때 까지 ; 그 순간에 블라킹이 한번에 풀려; 스레드 동기화 가능; 100개가 동시에 시작
//throw exception하기 때문에 callable 구현 필요
barrier.await();
log.info("thread {}", idx);
StopWatch s1 = new StopWatch();
s1.start();
String res = rt.getForObject(url, String.class, idx);
s1.stop();
log.info("elapsed:{} -> {} / {}", idx, s1.getTotalTimeSeconds(), res);
return null; //callable 구현한 인터페이스라고 생각함
});
}
barrier.await();
StopWatch sw = new StopWatch();
sw.start();
//graceful shutdown이라서 먼저 걸고 기다림
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
sw.stop();
log.info("total time {}", sw.getTotalTimeSeconds());
}
}
@SpringBootApplication
public class Application {
@RestController
public static class MyController{
RestTemplate rt = new RestTemplate();
//thread 1개
//2초씩 대기; 톰캣 큐에 대기
@GetMapping("/rest")
public String rest(int idx){
//RestTemplate blocking; 응답올때까지 대기; 스레드가 더 필요로하는 요청이라면 홀딩
String res = rt.getForObject("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
return res;
}
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
//설정
server.tomcat.threads.max=1
spring.task.execution.pool.core-size=100
@SpringBootApplication
public class RemoteService {
@RestController
public static class MyController{
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(2_000);
return req + "/service";
}
}
public static void main(String[] args) {
//application.properties 오버라이드하는 설정
System.setProperty("server.port", "8081");
System.setProperty("server.tomcat.threads.max", "1000");
SpringApplication.run(RemoteService.class, args);
}
}
...
12:23:06.118 [pool-1-thread-39] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:06.119 [pool-1-thread-39] INFO toby.live.lecture4.LoadTest - elapsed:39 -> 8.463001595 / hello39/service
12:23:08.127 [pool-1-thread-30] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:23:08.127 [pool-1-thread-30] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:08.127 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 10.472331585 / hello30/service
12:23:10.139 [pool-1-thread-34] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:23:10.139 [pool-1-thread-34] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:10.140 [pool-1-thread-34] INFO toby.live.lecture4.LoadTest - elapsed:34 -> 12.485099461 / hello34/service
...
2초 이상이 걸리는 100건의 작업이고 응답을 기다리는 동안은 블로킹되기 때문에 총 200초가량 소요된다. 허나 복잡한 로직도 아니고, 사실 cpu는 놀고 있을 것이다.. 이를 비동기로 바꿔보자.
@RestController
public static class MyController{
AsyncRestTemplate rt = new AsyncRestTemplate();
//thread 1개
@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx){
//호출을 비동기로; 바로 스레드 리턴, 응답이 오면 콜백은 스프링이 알아서; 응답이 오면 이 컨트롤러의 응답이라 생각하고 처리함; 콜백을 처리할 필요 없음
//비동기를 호출하기 위해 스래드를 백그라운드에 미리 만들어; 자바의 api를 사용해서; 겉으로는 한 스레드에서 처리한 것 처럼 보이지만 사실 서버의 자원을 사용한 것
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
return res;
}
}
...
12:42:49.442 [pool-1-thread-54] INFO toby.live.lecture4.LoadTest - elapsed:54 -> 2.516070823 / hello54/service
12:42:49.444 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:42:49.444 [pool-1-thread-12] INFO toby.live.lecture4.LoadTest - elapsed:12 -> 2.513689601 / hello12/service
12:42:49.444 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:42:49.445 [pool-1-thread-89] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:42:49.445 [pool-1-thread-89] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:42:49.445 [pool-1-thread-98] INFO toby.live.lecture4.LoadTest - elapsed:98 -> 2.518126106 / hello98/service
12:42:49.445 [pool-1-thread-89] INFO toby.live.lecture4.LoadTest - elapsed:89 -> 2.515622072 / hello89/service
12:42:49.446 [main] INFO toby.live.lecture4.LoadTest - total time 2.527531376
이렇게 실행하면 총 실행시간이 2초 남짓으로 마치 요청 한 건을 한 것과 비슷하게 나온다. 하지만 자바에서 제공하는 분석 툴인 jmc를 이용하여 /rest를 실행한 애플리케이션의 thread의 개수를 확인해보면..
서블랫 스래드는 하나만 만들어져야 하는데, 사실 뒤에서는 100개 이상의 스레드를 추가적으로 생성해서 요청하고 있음을 알 수 있다. 톰캣이 100개의 스레드를 만들었나 싶은데, 사실 톰캣 스래드는 사진에서 보는 것과 같이 하나고, 자바 api를 통해 백그라운드에서 100개 스레드를 만들어 놓고 실행하는 것이다. 즉 매번 서버 자원을 사용한다는 것인데, 이는 비효율적이다. 최소한의 자원을 사용해야 한다.
3. netty / nonblocking io를 사용하는 client lib 사용
/rest 부분의 rest template를 async rest template으로 변경하고 네티를 사용한다. 네티의 스레드를 1개로 설정한다.
@RestController
public static class MyController{
//Netty4ClientHttpRequestFactory spring 제공
//netty도 스레드 1개
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
//thread 1개
@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx) throws ExecutionException, InterruptedException {
log.info("/rest {}", idx);
//호출을 비동기로; 바로 스레드 리턴, 응답이 오면 콜백은 스프링이 알아서; 응답이 오면 이 컨트롤러의 응답이라 생각하고 처리함; 콜백을 처리할 필요 없음
//비동기를 호출하기 위해 스래드를 백그라운드에 미리 만들어; 자바의 api를 사용해서; 겉으로는 한 스레드에서 처리한 것 처럼 보이지만 사실 서버의 자원을 사용한 것
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
return res;
}
}
그리고 실행을 하면 처음에는 3초가 걸리고, 다시 또 실행을 하면.. 8초가 걸리고.. 다시 실행하면 timeout이 난다..
토비님과는 다른 결과.. 왜지ㅠ
...
15:13:45.147 [pool-1-thread-64] INFO toby.live.lecture4.LoadTest - elapsed:64 -> 3.230518378 / hello64/service
15:13:45.148 [pool-1-thread-82] INFO toby.live.lecture4.LoadTest - elapsed:82 -> 3.229926106 / hello82/service
15:13:45.148 [pool-1-thread-53] INFO toby.live.lecture4.LoadTest - elapsed:53 -> 3.229702047 / hello53/service
15:13:45.148 [pool-1-thread-62] INFO toby.live.lecture4.LoadTest - elapsed:62 -> 3.229505908 / hello62/service
15:13:45.149 [main] INFO toby.live.lecture4.LoadTest - total time 3.242286844
...
15:15:27.268 [pool-1-thread-60] INFO toby.live.lecture4.LoadTest - elapsed:60 -> 8.542415481 / hello60/service
15:15:27.268 [main] INFO toby.live.lecture4.LoadTest - total time 8.556960287
...
15:18:02.122 [pool-1-thread-26] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.124 [pool-1-thread-18] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.124 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.126 [pool-1-thread-12] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.129 [main] INFO toby.live.lecture4.LoadTest - total time 30.97585281
타임아웃이 나는 현상을 지켜보니 아래와 같은 순서로 진행되었다.
LoadTest -> /rest로 100건 요청
/rest에서 100건 요청받고 -> /service로 요청해야 하는데 /service 쪽 로그에 아무것도 안 찍힘
1분 뒤에 /rest에서 async request timeout exception 걸려서 503 return
그 후에 /service가 요청을 받아서 처리 하지만 /rest에서는 이미 exception 처리돼서 무쓸모
왜 /rest는 요청을 바로 받는데 /service로 바로 안 보내고 1분 후에 한 번에 보내는 걸까.. block 되어 있는 건가..
/rest가 1개의 스레드에서 도는데 어디선가 블라킹이 있어서 안 되는 것 같은데, 왠지 모르겠다. 느낌은 저 시대와 지금의 기본적인 풀 세팅 정보가 달라서 그럴 것 같다.
어쨌건 스레드를 100개씩이나 더 생성하지 않는 것은 확인했다.
자꾸 타임아웃이 나지만 우선은 진행한다.
ListenableFuture로 응답을 그대로 내리면 응답을 꺼내서 수정할 수가 없다. 이때 사용 가능한 것이 DeferredResult이다.
ListenableFuture에 콜백을 달고 DeferredResult에 setResult를 하면 응답을 수정할 수 있다
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) throws ExecutionException, InterruptedException {
log.info("/rest {}", idx);
DeferredResult<String> dr= new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
//res.get -> blocking 으로 바뀌어버림 의미없음
//callback이 실행되는거지 리턴을 주는게 아님; 응답을 받고 변형하려면? deferredReuslt!
res.addCallback(s -> {
dr.setResult(s.getBody() + "/work"); //응답 변형
}, e-> {
//throw 하면 어디서 에러가 난지 알 수 없어 비동기라
dr.setErrorResult(e.getMessage());
});
return dr;
}
중첩 콜을 원한다면..
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) throws ExecutionException, InterruptedException {
log.info("/rest {}", idx);
DeferredResult<String> dr= new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
res.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2-> {
dr.setResult(s2.getBody());
}, e2-> {
dr.setErrorResult(e2.getMessage());
});
}, e-> {
//throw 하면 어디서 에러가 난지 알 수 없어 비동기라
dr.setErrorResult(e.getMessage());
});
return dr;
}
2초짜리 콜을 2번 중첩해서(/service2는 /service의 카피본이다) 4초가량이 걸린다. (하지만 역시 나는 위와 같은 현상이 계속되어서 안되었다..)
3번 중첩한다면..
@Slf4j
@EnableAsync
@SpringBootApplication
public class Application {
@RestController
public static class MyController{
//Netty4ClientHttpRequestFactory spring 제공
//netty도 스레드 1개
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";
//thread 1개
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
log.info("/rest {}", idx);
DeferredResult<String> dr= new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e3 -> {
dr.setErrorResult(e3.getMessage());
}
);
}, e2-> {
dr.setErrorResult(e2.getMessage());
});
}, e-> {
dr.setErrorResult(e.getMessage());
});
return dr;
}
}
@Service
public static class MyService{
@Async
public ListenableFuture<String> work(String req){
return new AsyncResult<String>(req + "/asyncwork");
}
}
@Bean
ThreadPoolTaskExecutor myPool(){
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1); //queue까지 다 차면 맥스
te.initialize();
return te;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
주의해야 하는 것은 어디서든 다 async, 비동기가 되게끔 작성해야 한다는 것, 아니면 순식간에 동기로 바뀐다.
@Slf4j
@EnableAsync //있어야 async 작동
@SpringBootApplication
public class Spring {
@Component
public static class MyService{
//future인데 등록해서 받아볼 수 있게하는, 스프링이 만든 ListenableFuture
//요청할 때 마다 스레드를 계속 만들어 비효율적인 simpleAsyncTaskExecutor -> 빈으로 풀 등록해서 쓰자
//비동기가 여러개고 스레드 풀을 분리해서 지정하고 싶으면 Async("name") 지정 가능
@Async
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(2_000);
return new AsyncResult<>("hello");
}
}
//task executor를 빈으로 해두면 알아서 가져가서 쓰게 되어있음
@Bean
ThreadPoolTaskExecutor threadPoolExecutor(){
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(10);//기본으로 만드는거; 첫번째 스레드 요청이 올 때; jmx를 통해 런타임에도 값 수정이 가능
te.setQueueCapacity(200);//기본 풀이 다 차면 큐에 먼저 차
te.setMaxPoolSize(100);//큐가 꽉 찼을 때 에러나는데 그걸 막아주려고 하는 설정
te.setThreadNamePrefix("async-");
te.initialize();
return te;
}
public static void main(String[] args) {
//try with resource로 실행하면 작업 후 알아서 종료
try(ConfigurableApplicationContext c = SpringApplication.run(Spring.class, args)){
}
}
@Autowired MyService myService;
//모든 빈 등록 마치면 바로 실행 됨
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> f = myService.hello();
f.addCallback(s -> System.out.println(s), e -> System.out.println(e.getMessage()));
log.info("Exit");
};
}
}
여기서 주의할 점.
ThreadPoolTaskExecutor에 max pool size에 설정하는 값은 큐가 다 차고 나서 늘어가는 풀 사이즈이다.
@Slf4j
public class LoadTest {
static AtomicInteger cnt = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate rt = new RestTemplate();
String url = "http://localhost:8080/callable";
StopWatch sw = new StopWatch();
sw.start();
for(int i = 0; i <100; i++){
es.execute((() -> {
int idx = cnt.addAndGet(1);
log.info("thread {}", idx);
StopWatch s1 = new StopWatch();
s1.start();
rt.getForObject(url, String.class);
s1.stop();
log.info("elapsed:{} -> {}", idx, s1.getTotalTimeSeconds());
}));
}
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
sw.stop();
log.info("total time {}", sw.getTotalTimeSeconds());
}
}
@RestController
public static class MyController{
@GetMapping("/callable")
public String callable() throws InterruptedException {
log.info("async");
Thread.sleep(2_000);
return "hello";
}
}
위 요청은 아래 그림과 같이 표현된다.
load test는 100개의 스래드를 만들어 거의 동시에 요청했으나, 순차 처리로 인해 총 소요 시간은 200초가 넘는다.
...
13:04:56.428 [pool-1-thread-50] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.427 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.428 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.427 [pool-1-thread-90] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
...
13:08:04.993 [pool-1-thread-61] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:04.993 [pool-1-thread-61] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:04.994 [pool-1-thread-61] INFO toby.live.lecture4.LoadTest - elapsed:61 -> 188.612548654
13:08:06.999 [pool-1-thread-82] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:06.999 [pool-1-thread-82] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:06.999 [pool-1-thread-82] INFO toby.live.lecture4.LoadTest - elapsed:82 -> 190.618529907
13:08:09.002 [pool-1-thread-71] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:09.002 [pool-1-thread-71] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:09.003 [pool-1-thread-71] INFO toby.live.lecture4.LoadTest - elapsed:71 -> 192.622624014
13:08:11.009 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:11.009 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:11.009 [pool-1-thread-32] INFO toby.live.lecture4.LoadTest - elapsed:32 -> 194.629844982
13:08:13.012 [pool-1-thread-35] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:13.012 [pool-1-thread-35] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:13.013 [pool-1-thread-35] INFO toby.live.lecture4.LoadTest - elapsed:35 -> 196.631976988
13:08:15.015 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:15.015 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:15.016 [pool-1-thread-59] INFO toby.live.lecture4.LoadTest - elapsed:59 -> 198.634385976
13:08:17.018 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:17.018 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:17.019 [pool-1-thread-65] INFO toby.live.lecture4.LoadTest - elapsed:65 -> 200.638424197
/callable 쪽은 요청을 거의 동시에 100개의 요청을 받았지만 일반 동기 방식이므로 하나씩 처리한다.
...
2022-03-17 13:08:06.998 INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb : async
2022-03-17 13:08:09.002 INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb : async
2022-03-17 13:08:11.008 INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb : async
2022-03-17 13:08:13.012 INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb : async
2022-03-17 13:08:15.014 INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb : async
3. callable
/callable 쪽만 소스를 수정한다.
@RestController
public static class MyController{
@GetMapping("/callable")
public Callable<String> callable() throws InterruptedException {
log.info("callable");
//Callable Interface 익명 클래스
return () -> {
log.info("async");
Thread.sleep(2_000);
return "hello";
};
}
callable은 요청을 받으면 별도의 스레드를 생성해 일을 병렬로 처리한다.
100개의 스래드가 거의 동시에 요청을 보내 100개의 작업 스래드가 받아서 동시에 처리하기 때문에 총 소요시간도 2초 남짓이다.
...
13:33:24.694 [pool-1-thread-63] INFO toby.live.lecture4.LoadTest - elapsed:63 -> 2.289299445
13:33:24.694 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 2.291355199
13:33:24.695 [pool-1-thread-67] INFO toby.live.lecture4.LoadTest - elapsed:67 -> 2.292713823
13:33:24.695 [pool-1-thread-81] INFO toby.live.lecture4.LoadTest - elapsed:80 -> 2.289499954
13:33:24.695 [pool-1-thread-3] INFO toby.live.lecture4.LoadTest - elapsed:3 -> 2.290040178
13:33:24.695 [pool-1-thread-57] INFO toby.live.lecture4.LoadTest - elapsed:57 -> 2.292708376
13:33:24.696 [pool-1-thread-44] INFO toby.live.lecture4.LoadTest - elapsed:44 -> 2.29307173
13:33:24.696 [main] INFO toby.live.lecture4.LoadTest - total time 2.307467369
/callable이 1개의 서블릿 스레드로 받았지만 별도의 작업 스래드로 바로 토스하여 일을 하는 모습..
...
2022-03-17 13:33:22.624 INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb : callable
2022-03-17 13:33:22.624 INFO 23081 --- [ task-98] toby.live.lecture4.SpringWeb : async
2022-03-17 13:33:22.625 INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb : callable
2022-03-17 13:33:22.625 INFO 23081 --- [ task-99] toby.live.lecture4.SpringWeb : async
2022-03-17 13:33:22.625 INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb : callable
2022-03-17 13:33:22.625 INFO 23081 --- [ task-100] toby.live.lecture4.SpringWeb : async
4. DeferredResult
DeferredResult란 “지연된 결과”를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 비동기 작업이다.
기존에 /callable api가 있던 곳에 아래와 같이 신규 api를 추가해준다.
Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();
@GetMapping("/dr")
public DeferredResult<String> dr(){
log.info("dr");
//DR은 setResult가 오기전 까지 응답을 끊지 않고 대기, 그래서 테스트 시 타임아웃 시간을 길게
//결과가 오면 바로 return
DeferredResult<String> dr = new DeferredResult<>(600_000L);
results.add(dr);
return dr;
}
@GetMapping("/dr/count")
public String drcount(){
return String.valueOf(results.size());
}
@GetMapping("/dr/event")
public String drevent(String msg){
for(DeferredResult<String> dr: results){
dr.setResult("hello " + msg);
results.remove(dr);
}
return "OK";
}
load test에 /dr을 쏘게끔 설정하고 실행하면 100개의 요청을 우르르 보내지만 아무 응답을 받지 않고 대기하는 모습을 보이는데
이때 /dr/count를 쏴보면 100개의 요청이 큐에 들어있는 것을 확인할 수 있다.
그 상태에서 /dr/event?msg=today 를 요청하면 그제야 100개의 응답이 한 번에 온다. setResult라는 이벤트를 주면 리턴하는 방식이다.
...
13:48:17.039 [pool-1-thread-98] INFO toby.live.lecture4.LoadTest - elapsed:98 -> 213.022709247
13:48:17.039 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 213.027111781
13:48:17.039 [pool-1-thread-16] INFO toby.live.lecture4.LoadTest - elapsed:16 -> 213.026715767
13:48:17.040 [pool-1-thread-1] INFO toby.live.lecture4.LoadTest - elapsed:1 -> 213.025866684
5. ResponseBodyEmitter
ResponseBodyEmitter는 하나의 요청에 대한 응답을 여러번에 나눠서 보낼 때 사용된다.
이전에 프로젝트에 actuator를 적용하여 /health url을 통해 배포된 소스의 해쉬, 브랜치, 잘 떠있는지 유무 등을 알 수 있었다.
스프링 actuator는 metrics를 통해 애플리케이션의 현 자원 상태(스레드 풀 상태, db 커넥션 상태, jvm gc상태 등)를 알 수 있게 하는데, 이를 주기적으로 요청하여 그 흐름을 관찰할 수도 있다. 특히 스프링에서 지원하는 prometheus를 추가적으로 설정하면 prometheus서버가 원하는 포맷으로 변환해주기 때문에 쉽게 prometheus서버와 연동이 가능하다.
오늘은 서비스 모니터링에 주로 쓰이는 prometheus를 추가적으로 설정해본다.
이전 글에서 설정한 그대로 작업 시작한다.
1. gradle build에 추가
//기존에 있었음
implementation 'org.springframework.boot:spring-boot-starter-actuator'
//actuator prometheus 추가
implementation("io.micrometer:micrometer-registry-prometheus")
시간이 오래 걸리는 작업을 할 때는 submit 추천. future object를 리턴하기 때문에 나중에 get으로 꺼낼 수 있음(get은 블로킹!)
그냥 스레드의 병렬 처리만 필요하면 execute
execute()
submit()
accept
Runnable
Runnable and Callable
declared in
Executor interface
ExecutorService interface
return
void
a Future object
es.shutdown 은 graceful shutdown이라서 할 거 다 할 때까지 기다림
es.shutdownNow 가 강제 종료
es.awaitTermination 은 타임아웃을 기다리는 용도로 실제 shutdown 시키지는 않음
따라서 shutdown 먼저 걸로 awaitTermination을 써야 함
1. Future class
public static void main(String[] args) throws InterruptedException, ExecutionException {
//스래드를 새로 만들고 반납하는 것이 비용이 많이 들어가니, 풀이 나옴
//맥시멈 제한없고 처음에 스레드 없음 요청할 때 새로 만들고 다음 요청 시 기존 만든 스레드 실행
ExecutorService es = Executors.newCachedThreadPool();
//es.execute -> void
//Future 비동기 핸들러지 결과물은 아님
Future<String> f = es.submit(() -> {
Thread.sleep(5_000);
log.info("helloo");
return "hello";
});
//f.isDone() -> non blocking method;
System.out.println(f.isDone());
log.info("before");
//f.get -> blocking method
System.out.println(f.get()); //비동기가 완성될 때 까지 블로킹(20초 기다림)
log.info("exit");//맨 마지막
System.out.println(f.isDone());
}
2. FutureTask class
Future 자체를 object로 만들어줌
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
FutureTask<String> f = new FutureTask<String>(() -> {
Thread.sleep(3_000);
log.info("async");
return "hello";
}){//익명클래스
@Override
protected void done() { //다 하면 이거해; f.get 할 필요없음
try {
System.out.println(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};
es.execute(f);
System.out.println(f.isDone());
//System.out.println(f.get());
es.shutdown(); //다 끝나면 종료
}
3. FutureTask 확장
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(3_000);
if(1 == 1){
throw new RuntimeException(">async error");
}
log.info("async");
return "hello";
}, s -> System.out.println("success:" + s)
, e -> System.out.println(">error: " + e.getMessage())
);
es.execute(f);
es.shutdown(); //다 끝나면 종료
}
interface SuccessCallback{
void onSuccess(String result);
}
interface ExceptionCallback{
void onError(Throwable t);
}
public static class CallbackFutureTask extends FutureTask<String>{
SuccessCallback sc;
ExceptionCallback ec;
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
super(callable);
//if(sc == null) throw null; //null point exception
this.sc = Objects.requireNonNull(sc); //if null -> NPE
this.ec = Objects.requireNonNull(ec);
}
@Override
protected void done() {
try {
sc.onSuccess(get());
} catch (InterruptedException e) { //종료 시그널
Thread.currentThread().interrupt();
} catch (ExecutionException e) { // 찐에러
ec.onError(e.getCause());
}
}
}
허나 springboot 2.6.2에서 사용하려고 봤더니 아래와 같이 deprecated 되었다고 나오는 것이 아닌가..
//deprecated since 2.5
@Deprecated
public class ChainedTransactionManager implements PlatformTransactionManager
그래서 대안이 뭔고 봤더니, TransactionSynchronization를 사용하라고 한다.
Instead of using ChainedTransactionManager for attaching callbacks to transaction commit (pre commit/post commit), either register a TransactionSynchronization to explicitly follow transaction cleanup with simplified semantics in case of exceptions. See Also: TransactionSynchronization.beforeCommit(boolean), TransactionSynchronization.afterCommit()
로그 레벨을 DEBUG로 프로젝트를 실행하니 아래와 같은 에러가 빵빵 지나가는 것이 아닌가..
grpc를 사용하는 프로젝트라, grpc server가 내려가 있어서 발행하는 것이 아닌가 싶어서 기동 후 다시 봤는데도 마찬가지였다.
java.lang.UnsupportedOperationException: Reflective setAccessible(true) disabled
at io.grpc.netty.shaded.io.netty.util.internal.ReflectionUtil.trySetAccessible(ReflectionUtil.java:31)
at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$4.run(PlatformDependent0.java:239)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0.<clinit>(PlatformDependent0.java:233)
at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.isAndroid(PlatformDependent.java:294)
at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.<clinit>(PlatformDependent.java:93)
at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:223)
at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:210)
at io.grpc.netty.shaded.io.netty.util.AsciiString.cached(AsciiString.java:1401)
at io.grpc.netty.shaded.io.netty.util.AsciiString.<clinit>(AsciiString.java:48)
at io.grpc.netty.shaded.io.grpc.netty.Utils.<clinit>(Utils.java:78)
at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:82)
...
java.lang.IllegalAccessException: class io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$6 cannot access class jdk.internal.misc.Unsafe (in module java.base) because module java.base does not export jdk.internal.misc to unnamed module @61874b9d
at java.base/jdk.internal.reflect.Reflection.newIllegalAccessException(Reflection.java:361)
at java.base/java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:591)
at java.base/java.lang.reflect.Method.invoke(Method.java:558)
at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$6.run(PlatformDependent0.java:353)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0.<clinit>(PlatformDependent0.java:344)
at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.isAndroid(PlatformDependent.java:294)
at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.<clinit>(PlatformDependent.java:93)
at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:223)
at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:210)
at io.grpc.netty.shaded.io.netty.util.AsciiString.cached(AsciiString.java:1401)
at io.grpc.netty.shaded.io.netty.util.AsciiString.<clinit>(AsciiString.java:48)
at io.grpc.netty.shaded.io.grpc.netty.Utils.<clinit>(Utils.java:78)
at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:82)
원인: 자바 11부터 Object의 접근권한이 더이상 증가하지 않는다는 에러. 에러가 아예 안 나려면(정상 액션을 원한다면) 자바 8을 사용하라는데, 발전하는 IT 시대에 역행은 오버라고 생각한다.. 자바 11에 AccessibleObject라는 새로운 클래스가 생겼으니, 아마 관련 라이브러리들이 곧 수정하지 않을까 하는 생각..
위 코드를 돌리면 아래와 같이 로그가 찍힌다. 보이는 것처럼 main 스레드가 모든 일을 다 하고 있고, 만약 중간에 블로킹이 걸리면 main스레드는 진행을 하지 못하게 된다. pub - sub 사이의 일은 다른 스레드가 하게끔 하고 main스레드는 지나치게 하려면 어떻게 할까
subscribeOn
주는자(publisher)가 느리거나 예측 불가할 경우 & 처리(consumer)가 빠를 때
subscriber를 별도의 스레드로 분리
public static void main(String[] args) {
Flow.Publisher<Integer> pub = subscriber -> {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onComplete();
}
@Override
public void cancel() {
}
});
};
Flow.Publisher<Integer> subOnPub = subscriber -> {
ExecutorService es = Executors.newSingleThreadExecutor(); //1개 스레드, 몰리면 큐에 넣고 순차처리
es.execute(()->{
pub.subscribe(subscriber);
});
};
subOnPub.subscribe(new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe");
subscription.request(Long.MAX_VALUE); //다 보내
}
@Override
public void onNext(Integer item) {
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) {
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("EXXXXIT");
}
이전에 filter와 interceptor의 차이를 보았는데, 오늘은 filter와 oncePerRequestFilter의 차이를 보려고 한다. 그런데 이게 필턴데 스프링 컨텍스트 안에서 작용한다고? 그럼 또 interceptor를 놓칠 수 없다..
OncePerRequestFilter
OncePerRequestFilter는 Spring Framework에서 제공하는 필터로, Spring의 서블릿 컨테이너에서 동작한다. 이는 Spring의 일반적인 필터 체인에서 동작하는데, Spring이 제공하는 기능이기 때문에 Spring 이외의 서블릿 컨테이너에서는 자동으로 동작하지 않는다. 또한 extend로 필터를 구현하며 @Component로 등록되어 있어야 한다.
A Filter can be called either before or after servlet execution. When a request is dispatched to a servlet, the RequestDispatcher may forward it to another servlet. There's a possibility that the other servlet also has the same filter. In such scenarios, the same filter gets invoked multiple times.
Spring guarantees that the OncePerRequestFilter is executed only once for a given request.
spring은 filter에서 spring config 설정 정보를 쉽게 처리하기 위한 GenericFilterBean을 제공한다.
사용자는 요청을 한 번 보냈지만 redirect나 내부적인 이슈로 한 요청이 여러 번의 요청을 낳게 될 수 있다. 이 경우 인증, 로깅 등 필터가 (의도치 않게) 중첩적으로 호출되는데 이를 방지하고 하는 필터가 OncePerRequestFilter이다. OncePerRequestFilter 역시 GenericFilterBean상속받아 구현되어 있다. 즉 스프링이 제어한다는 것이고, 스프링은 OncePerRequestFilter가 주어진 요청에 대해 단 한 번만 수행되는 것을 보장한다.
OncePerRequestFilter는 다음과 같이 동작합니다:
서블릿 컨테이너에서 Spring 필터 체인이 실행될 때, OncePerRequestFilter는 요청을 가로채서 처리
필터 체인을 통해 요청을 전달할 수 있으며, 필터 체인 내에서 다른 필터들이 중복 실행되지 않도록 보장
필터 체인에서 중복 실행 방지가 필요한 경우에 사용. 일반적인 javax.servlet.Filter는 요청이 FORWARD 또는 INCLUDE로 서블릿 내부에서 재처리될 때 여러 번 호출될 수 있지만, OncePerRequestFilter는 각 요청당 한 번만 실행됩니다.
실제 동작 흐름:
아무 설정을 하지 않아도 Spring Boot는 Spring 필터(Specifically, Spring Security 등)를 서블릿 컨테이너 필터보다 먼저 실행되도록 자동으로 설정합니다. 이는 Spring Boot의 자동 구성(Autoconfiguration) 기능과 내부 필터 체인 관리방식 덕분입니다. Spring Boot는 애플리케이션 시작 시 자동으로 Spring 필터들을 서블릿 컨테이너에서 관리되는 필터보다 우선적으로 실행되도록 설정합니다.
서블릿 컨테이너가 요청을 받습니다.
Spring Boot는 Spring 필터 체인을 먼저 실행하도록 설정합니다.
Spring에서 관리하는 필터(예: OncePerRequestFilter)가 먼저 실행됩니다.
그 후, 서블릿 컨테이너에 등록된 일반 필터(javax.servlet.Filter)가 실행됩니다.
2024-10-08 15:35:41 INFO [c.n.a.f.CustomServletWrappingFilter .doFilterInternal : 24] [once-filter][REQUEST] [GET] /api/asdf
2024-10-08 15:35:41 INFO [c.n.aapoker.filter.RequestLogFilter .doFilter : 34] [filter][REQUEST] [GET] /api/asdf
2024-10-08 15:35:41 INFO [c.n.aapoker.filter.RequestInterceptor .preHandle : 21] [interceptor][REQUEST] [GET] /api/asdf
2024-10-08 15:35:41 INFO [c.n.aapoker.filter.RequestInterceptor .postHandle : 39] [interceptor][RESPONSE] [GET] /api/asdf 404 - 0.003 ms
2024-10-08 15:35:41 INFO [c.n.aapoker.filter.RequestLogFilter .doFilter : 43] [filter][RESPONSE] [GET] /api/asdf 404 - 0.01ms
2024-10-08 15:35:41 INFO [c.n.a.f.CustomServletWrappingFilter .doFilterInternal : 32] [once-filter][RESPONSE] [GET] /api/asdf 404 - 0.011ms
2024-10-08 15:35:41 INFO [c.n.aapoker.filter.RequestInterceptor .preHandle : 21] [interceptor][REQUEST] [GET] /error
2024-10-08 15:35:41 INFO [c.n.aapoker.filter.RequestInterceptor .postHandle : 39] [interceptor][RESPONSE] [GET] /error 404 - 0.013 ms
에러 화면을 요청할 경우 필터와 onceRequestFilter는 한번씩만 실행되고, 인터셉터는 스프링 진영에서 자동으로 route되는 error화면까지 잡힌다.
onceRequestFilter도 스프링에서 관리되는 필터지만 error를 잡지 않는다는 것을 확인!
참고) ContentCachingRequestWrapper
ContentCachingRequestWrapper는 요청(Request) 본문을 캐싱할 수 있도록 도와주는 래퍼 클래스입니다. 기본적으로, 서블릿 요청의 본문은 한 번만 읽을 수 있는 스트림 형태로 제공됩니다. 하지만 특정 상황에서는 요청 본문을 여러 번 읽어야 하거나, 로깅 또는 분석 목적으로 요청 본문을 캐싱하고자 할 때가 있습니다.