728x90
반응형
728x90
반응형
반응형

이전 글: 2022.03.21 - [개발/reactive] - [reactive] 6. refactoring

 

[reactive] 6. refactoring

이전 글: 2022.03.18 - [개발/reactive] - [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet 오늘은 5강을 이어서 학습한다. 1. default, block..

bangpurin.tistory.com

 

 

CompletableFuture

  • 비동기 시작 supplyAsync vs runAsync
    • supplyAsync: 파라미터로 supplier 인터페이스 받고 반환 값(CompletableFuture <T>) 존재
    • runAsync: 파라미터로 runnable 인터페이스 받고 반환 값없음
  • 채이닝, 후속작업 thenApply vs thenAccept vs thenRun vs thenCompose vs thenCombine
    • thenApply: 데이터를 포함하는 future 반환(CompletableFuture <U>)
    • thenAccept: 파라미터로 Comsumer 받고 반환 값없음 (CompletableFuture <Void>)
    • thenRun: 파라미터로 runnable 받고 반환 값없음
    • thenCompose: 앞 단계의 CompletableFuture을 주고받고 하면서 순차적으로 연결 가능
    • thenCombine: 전혀 다른 CompletableFuture을 첫 인자로 받고, 결과를 연산하는 bifunction을 두 번째 인자로 받음
  • 데이터 가져오기(blocking) get vs join
    • get: checkedException 던지기 때문에 try/catch 처리 필요
    • join:  uncheckedException 던짐; 예외처리가 내부적
  • 감싸기 completedFuture: 이미 완료된 작업이나 정적인 값을 CompletableFuture로 감쌀 때
  • 예외 발생 시 exceptionally vs handle
    • exceptionally: 예외 발생 시 해당 예외를 받아서 처리 가능
    • handle: s, e를 다 받는 bifunction 구현체
  • 별도의 스레드 풀에서 작업 시 ~async 메서드
    • 다른 스레드가 후속 작업을 진행
    • thenApplyAsync
  • 여러 CompletableFuture을 병렬로 실행하고 모든 프로세스가 끝나길 기다렸다가(blocking) 처리하려면 allOf

 

연습 코드

   ExecutorService es = Executors.newFixedThreadPool(10);
    //Future 비동기 작업 결과를 담고있는 오브젝트 get -> blocking
    //listenable future 콜백; 완료 시점에
    //CompletableFuture 비동기 작업의 결과를 간단하게 만들 수
    //리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 장점
    //병렬성과 동시성에서 CP가 의미있는데, 여러 cpu core 사이에 지연실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능

    //completion station? stage? 장점은 코어성능 20% 더 효율적임
    //then ~ 이전의 스레드를 사용
    //then~async 하면 정책에 따라 새로운 스래드 할당
    CompletableFuture
            .supplyAsync(() -> {
                log.info("run");
                //if(1==1) throw new RuntimeException(); //exception으로 감
                return 1; //생성
            })
            .thenCompose(s -> { //compose 하면 completedfutre의 값만 넘김
                log.info("then {}", s);
                return CompletableFuture.completedFuture(s + 1); //받아서 작업하고 넘기고
            })
            .thenApply(s3 -> {
                log.info("then3 {}", s3);
                return s3 * 10; //받아서 작업하고 넘기고
            })
            .exceptionally(e -> -10) //예외발생하면 복구할 때 사용가능
            .thenAcceptAsync(s2 -> log.info("thenn2 {}", s2), es) //받아서 끝 어떤 스래드풀인지 알려줘야
    ;
    log.info("exit");

 

이전 시간에 작성한 코드를 CompletableFuture 방식으로 바꿔본다. 수정된 부분 위주로 작성한다.

///before
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();

    ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
    f1.addCallback(s -> {
        ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
        f2.addCallback(s2 -> {
            ListenableFuture<String> f3 = myService.work(s2.getBody());
            f3.addCallback(s3 -> {
                dr.setResult(s3);
            }, e3 -> {
                dr.setErrorResult(e3.getMessage());
                    }
            );
        }, e2-> {
            dr.setErrorResult(e2.getMessage());
        });
    }, e-> {
        //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
        dr.setErrorResult(e.getMessage());
    });
    return dr;
}
///after
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();

    toCF(rt.getForEntity(URL1, String.class, "f1" + idx))
            .thenCompose(s -> {
                if(1==1) throw new RuntimeException("ERROR");
                return toCF(rt.getForEntity(URL2, String.class, s.getBody()));
            })
          //  .thenCompose(s2 -> toCF(myService.work(s2.getBody())))
            .thenApplyAsync(s2 -> myService.work(s2.getBody()))
            .thenAccept(s3 -> dr.setResult(s3))
            .exceptionally(e -> {
                dr.setErrorResult(e.getMessage());
                return null;
            })
    ;

    return dr;
}

//lf -> cf 변환
<T> CompletableFuture<T> toCF(ListenableFuture<T> lf){
    CompletableFuture<T> cf = new CompletableFuture<T>(); //작업의 결과를 나타내는거지 비동기 작업자체는 아님
    lf.addCallback(s -> {cf.complete(s);}, e -> {cf.completeExceptionally(e);});
    return cf;
}

////
@Service
public static class MyService{
    //@Async //cf를 쓴다면 동기로 하고 위에서 적용하는것도 방법
    public String work(String req){
        log.info("myservice {}" , req);
      //  return new AsyncResult<String>(req + "/asyncwork");
        return req + "/asyncwork";
    }
}

참고

https://brunch.co.kr/@springboot/267

 

CompletableFuture

자바 비동기 프로그래밍을 위한 CompletableFuture 검토 | 필자는 최근에 CompletableFuture 를 사용해서 개발을 진행하였는데, CompletableFuture는 작년에 한번 사용한 이후로는 실무에서 사용할 일이 거의

brunch.co.kr

https://wbluke.tistory.com/50

 

CompletableFuture 톺아보기

CompletableFuture 학습 테스트 안녕하세요! 이번 포스팅에서는 학습 테스트를 통해 CompletableFuture를 알아보는 시간을 가져보려고 합니다. 모든 코드는 GitHub에 있으니 참고하시면 됩니다. CompletableFutur

wbluke.tistory.com

 

728x90
반응형
반응형

이전 글: 2022.03.18 - [개발/reactive] - [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet

 

[reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet

오늘은 5강을 이어서 학습한다. 1. default, blocking request 아래 그림처럼 2초씩 지연이 있는 /service를 요청하는 /rest를 100건 rest template로 요청한다. /rest의 스래드가 한 개라서 2초씩 대기한다. @Sl..

bangpurin.tistory.com

 

 

이전 글의 콜백 지옥을 새로운 클래스 선언을 통해 리팩토링해본다.

//////////////before
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
f1.addCallback(s -> {
    ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
    f2.addCallback(s2 -> {
        ListenableFuture<String> f3 = myService.work(s2.getBody());
        f3.addCallback(s3 -> {
            dr.setResult(s3);
        }, e3 -> {
            dr.setErrorResult(e3.getMessage());
                }
        );
    }, e2-> {
        dr.setErrorResult(e2.getMessage());
    });
}, e-> {
    //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
    dr.setErrorResult(e.getMessage());
});

///////////after
Completion
    //최초 실행
    .from(rt.getForEntity(URL1, String.class, "f1" + idx))
    //두번재 받아서 작업하고 return 해야함
    .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
    //어디서든 에러가 나면
    .andError(e -> dr.setErrorResult(e.toString()))
    //수행만하고 끝내는 아이 consumer
    .andAccept(s -> dr.setResult(s.getBody()));
public static class AcceptCompletion extends Completion{
    Consumer<ResponseEntity<String>> con;
    public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
        this.con = con;
    }

    @Override
    void run(ResponseEntity<String> val) {
        con.accept(val);
    }
}

public static class ErrorCompletion extends Completion{
    Consumer<Throwable> econ;
    //listenablefuture 실패 시 받는 throwable 처리
    public ErrorCompletion(Consumer<Throwable> econ) {
        this.econ = econ;
    }

    @Override
    void run(ResponseEntity<String> val) {
        //정상적일때 실행되면 안됨; 패스
        if(next != null){
            next.run(val);
        }
    }

    @Override
    void error(Throwable e) {
        econ.accept(e);
    }
}

public static class ApplyCompletion extends Completion{
    Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
    public ApplyCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
        this.fn = fn;
    }

    @Override
    void run(ResponseEntity<String> val) {
        ListenableFuture<ResponseEntity<String>> lf = fn.apply(val);
        lf.addCallback(s -> complete(s), e-> error(e));
    }
}

public static class Completion{
    Completion next;

    public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
        Completion c = new Completion();

        lf.addCallback(s -> {
            c.complete(s);
        }, e -> {
            c.error(e);
        });
        return c;
    }

    void error(Throwable e) {
        if(next != null){
            next.error(e);
        }
    }

    void complete(ResponseEntity<String> s) {
        if(next != null){
            next.run(s);
        }
    }

    void run(ResponseEntity<String> val) {
    }

    public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fun){
        Completion c = new ApplyCompletion(fun);
        this.next = c;
        return c;
    }

    public void andAccept(Consumer<ResponseEntity<String>> con){
        Completion c = new AcceptCompletion(con);
        this.next = c;
    }

    public Completion andError(Consumer<Throwable> econ){
        Completion c = new ErrorCompletion(econ);
        this.next = c;
        return c;
    }
}

위와 같이 만들자니 세 번째 콜을 아래와 같이 추가할 때 형 변환 에러가 난다.

  .andApply(s -> myService.work(s.getBody)

andApply는 request로 ResponseEntity<String>을 주고 response로 ListenableFuture<ResponseEntity<String>>를 받아야 하지만 work 함수는 그렇지 않다.

그러면 저 request/response를 만족하는 같은 내용의 함수를 또 작성해야 하는가?

뭐 그래도 작동은 하겠지만, 매번 함수를 작성하는 것은 깔끔하지 못하기에 generic을 통해 해결한다.

public static class AcceptCompletion<S> extends Completion<S, Void>{
    Consumer<S> con;
    public AcceptCompletion(Consumer<S> con) {
        this.con = con;
    }

    @Override
    void run(S val) {
        con.accept(val);
    }
}

public static class ErrorCompletion<T> extends Completion<T, T>{
    Consumer<Throwable> econ;
    //listenablefuture 실패 시 받는 throwable 처리
    public ErrorCompletion(Consumer<Throwable> econ) {
        this.econ = econ;
    }

    @Override
    void run(T val) {
        //정상적일때 실행되면 안됨; 패스
        if(next != null){
            next.run(val);
        }
    }

    @Override
    void error(Throwable e) {
        //계속 타고온 에러를 처리
        econ.accept(e);
    }
}

public static class ApplyCompletion<S, T> extends Completion<S, T>{
    Function<S, ListenableFuture<T>> fn;
    public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
        this.fn = fn;
    }

    @Override
    void run(S val) {
        ListenableFuture<T> lf = fn.apply(val);
        lf.addCallback(s -> complete(s), e-> error(e));
    }
}

public static class Completion<S, T>{
    Completion next;

    public static <S, T> Completion<S,T> from(ListenableFuture<T> lf) {
        Completion<S, T> c = new Completion<>();

        lf.addCallback(s -> {
            c.complete(s);
        }, e -> {
            c.error(e);
        });
        return c;
    }

    void error(Throwable e) {
        if(next != null){
            next.error(e);
        }
    }

    void complete(T s) {
        if(next != null){
            next.run(s);
        }
    }

    void run(S val) {
    }

    public <V> Completion<T, V> andApply(Function<T, ListenableFuture<V>> fun){
        Completion<T, V> c = new ApplyCompletion(fun);
        this.next = c;
        return c;
    }

    public void andAccept(Consumer<T> con){
        Completion<T, Void> c = new AcceptCompletion(con);
        this.next = c;
    }

    public Completion<T, T> andError(Consumer<Throwable> econ){
        Completion<T, T> c = new ErrorCompletion<>(econ);
        this.next = c;
        return c;
    }
}

 

728x90
반응형
반응형

오늘은 5강을 이어서 학습한다.

 

1. default, blocking request

아래 그림처럼 2초씩 지연이 있는 /service를 요청하는 /rest를 100건 rest template로 요청한다. /rest의 스래드가 한 개라서 2초씩 대기한다.

@Slf4j
public class LoadTest {
    static AtomicInteger cnt = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);

        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";

        CyclicBarrier barrier = new CyclicBarrier(101);

        for(int i = 0; i <100; i++){
            es.submit(() -> {
                int idx = cnt.addAndGet(1);

                //blocking till await을 만난 숫자가 100에 도달할 때 까지 ; 그 순간에 블라킹이 한번에 풀려; 스레드 동기화 가능; 100개가 동시에 시작
                //throw exception하기 때문에 callable 구현 필요
                barrier.await();

                log.info("thread {}", idx);
                StopWatch s1 = new StopWatch();
                s1.start();

                String res = rt.getForObject(url, String.class, idx);

                s1.stop();
                log.info("elapsed:{} -> {} / {}", idx, s1.getTotalTimeSeconds(), res);
                return null; //callable 구현한 인터페이스라고 생각함
            });
        }

        barrier.await();

        StopWatch sw = new StopWatch();
        sw.start();

        //graceful shutdown이라서 먼저 걸고 기다림
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);

        sw.stop();
        log.info("total time {}", sw.getTotalTimeSeconds());
    }
}
@SpringBootApplication
public class Application {

    @RestController
    public static class MyController{
        RestTemplate rt = new RestTemplate();

        //thread 1개
        //2초씩 대기; 톰캣 큐에 대기
        @GetMapping("/rest")
        public String rest(int idx){
            //RestTemplate blocking; 응답올때까지 대기; 스레드가 더 필요로하는 요청이라면 홀딩
            String res = rt.getForObject("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
            return res;
        }
    }


    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

//설정
server.tomcat.threads.max=1
spring.task.execution.pool.core-size=100
@SpringBootApplication
public class RemoteService {

    @RestController
    public static class MyController{
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(2_000);
            return req + "/service";
        }
    }

    public static void main(String[] args) {
    	//application.properties 오버라이드하는 설정
        System.setProperty("server.port", "8081");
        System.setProperty("server.tomcat.threads.max", "1000");
        SpringApplication.run(RemoteService.class, args);
    }
}
...
12:23:06.118 [pool-1-thread-39] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:06.119 [pool-1-thread-39] INFO toby.live.lecture4.LoadTest - elapsed:39 -> 8.463001595 / hello39/service
12:23:08.127 [pool-1-thread-30] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:23:08.127 [pool-1-thread-30] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:08.127 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 10.472331585 / hello30/service
12:23:10.139 [pool-1-thread-34] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:23:10.139 [pool-1-thread-34] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:10.140 [pool-1-thread-34] INFO toby.live.lecture4.LoadTest - elapsed:34 -> 12.485099461 / hello34/service
...

2초 이상이 걸리는 100건의 작업이고 응답을 기다리는 동안은 블로킹되기 때문에 총 200초가량 소요된다. 허나 복잡한 로직도 아니고, 사실 cpu는 놀고 있을 것이다.. 이를 비동기로 바꿔보자.

 

2. AsyncRestTemplate(spring5.0/springboot2.0에서 deprecated)

위 소스에서 /rest의 rest template만 바꿔본다.

@RestController
public static class MyController{
    AsyncRestTemplate rt = new AsyncRestTemplate();

    //thread 1개
    @GetMapping("/rest")
    public ListenableFuture<ResponseEntity<String>> rest(int idx){
        //호출을 비동기로; 바로 스레드 리턴, 응답이 오면 콜백은 스프링이 알아서; 응답이 오면 이 컨트롤러의 응답이라 생각하고 처리함; 콜백을 처리할 필요 없음
        //비동기를 호출하기 위해 스래드를 백그라운드에 미리 만들어; 자바의 api를 사용해서; 겉으로는 한 스레드에서 처리한 것 처럼 보이지만 사실 서버의 자원을 사용한 것
        ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
        return res;
    }
}
...
12:42:49.442 [pool-1-thread-54] INFO toby.live.lecture4.LoadTest - elapsed:54 -> 2.516070823 / hello54/service
12:42:49.444 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:42:49.444 [pool-1-thread-12] INFO toby.live.lecture4.LoadTest - elapsed:12 -> 2.513689601 / hello12/service
12:42:49.444 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:42:49.445 [pool-1-thread-89] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:42:49.445 [pool-1-thread-89] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:42:49.445 [pool-1-thread-98] INFO toby.live.lecture4.LoadTest - elapsed:98 -> 2.518126106 / hello98/service
12:42:49.445 [pool-1-thread-89] INFO toby.live.lecture4.LoadTest - elapsed:89 -> 2.515622072 / hello89/service
12:42:49.446 [main] INFO toby.live.lecture4.LoadTest - total time 2.527531376

이렇게 실행하면 총 실행시간이 2초 남짓으로 마치 요청 한 건을 한 것과 비슷하게 나온다. 하지만 자바에서 제공하는 분석 툴인 jmc를 이용하여 /rest를 실행한 애플리케이션의 thread의 개수를 확인해보면..

서블랫 스래드는 하나만 만들어져야 하는데, 사실 뒤에서는 100개 이상의 스레드를 추가적으로 생성해서 요청하고 있음을 알 수 있다. 톰캣이 100개의 스레드를 만들었나 싶은데, 사실 톰캣 스래드는 사진에서 보는 것과 같이 하나고, 자바 api를 통해 백그라운드에서 100개 스레드를 만들어 놓고 실행하는 것이다. 즉 매번 서버 자원을 사용한다는 것인데, 이는 비효율적이다. 최소한의 자원을 사용해야 한다.

 

3. netty / nonblocking io를 사용하는 client lib 사용

/rest 부분의 rest template를 async rest template으로 변경하고 네티를 사용한다. 네티의 스레드를 1개로 설정한다.

@RestController
public static class MyController{
    //Netty4ClientHttpRequestFactory spring 제공
    //netty도 스레드 1개
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

    //thread 1개
    @GetMapping("/rest")
    public ListenableFuture<ResponseEntity<String>> rest(int idx) throws ExecutionException, InterruptedException {
        log.info("/rest {}", idx);
        //호출을 비동기로; 바로 스레드 리턴, 응답이 오면 콜백은 스프링이 알아서; 응답이 오면 이 컨트롤러의 응답이라 생각하고 처리함; 콜백을 처리할 필요 없음
        //비동기를 호출하기 위해 스래드를 백그라운드에 미리 만들어; 자바의 api를 사용해서; 겉으로는 한 스레드에서 처리한 것 처럼 보이지만 사실 서버의 자원을 사용한 것
        ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
        return res;
    }
}

그리고 실행을 하면 처음에는 3초가 걸리고, 다시 또 실행을 하면.. 8초가 걸리고..  다시 실행하면 timeout이 난다..

토비님과는 다른 결과.. 왜지ㅠ

...
15:13:45.147 [pool-1-thread-64] INFO toby.live.lecture4.LoadTest - elapsed:64 -> 3.230518378 / hello64/service
15:13:45.148 [pool-1-thread-82] INFO toby.live.lecture4.LoadTest - elapsed:82 -> 3.229926106 / hello82/service
15:13:45.148 [pool-1-thread-53] INFO toby.live.lecture4.LoadTest - elapsed:53 -> 3.229702047 / hello53/service
15:13:45.148 [pool-1-thread-62] INFO toby.live.lecture4.LoadTest - elapsed:62 -> 3.229505908 / hello62/service
15:13:45.149 [main] INFO toby.live.lecture4.LoadTest - total time 3.242286844

...
15:15:27.268 [pool-1-thread-60] INFO toby.live.lecture4.LoadTest - elapsed:60 -> 8.542415481 / hello60/service
15:15:27.268 [main] INFO toby.live.lecture4.LoadTest - total time 8.556960287

...
15:18:02.122 [pool-1-thread-26] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.124 [pool-1-thread-18] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.124 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.126 [pool-1-thread-12] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.129 [main] INFO toby.live.lecture4.LoadTest - total time 30.97585281

 

타임아웃이 나는 현상을 지켜보니 아래와 같은 순서로 진행되었다.

  1. LoadTest -> /rest로 100건 요청 
  2. /rest에서 100건 요청받고 -> /service로 요청해야 하는데 /service 쪽 로그에 아무것도 안 찍힘
  3. 1분 뒤에 /rest에서 async request timeout exception 걸려서 503 return
  4. 그 후에 /service가 요청을 받아서 처리 하지만 /rest에서는 이미 exception 처리돼서 무쓸모

왜 /rest는 요청을 바로 받는데 /service로 바로 안 보내고 1분 후에 한 번에 보내는 걸까.. block 되어 있는 건가..

/rest가 1개의 스레드에서 도는데 어디선가 블라킹이 있어서 안 되는 것 같은데, 왠지 모르겠다. 느낌은 저 시대와 지금의 기본적인 풀 세팅 정보가 달라서 그럴 것 같다.

 

어쨌건 스레드를 100개씩이나 더 생성하지 않는 것은 확인했다.

 

자꾸 타임아웃이 나지만 우선은 진행한다.

ListenableFuture로 응답을 그대로 내리면 응답을 꺼내서 수정할 수가 없다. 이때 사용 가능한 것이 DeferredResult이다.

ListenableFuture에 콜백을 달고 DeferredResult에 setResult를 하면 응답을 수정할 수 있다

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) throws ExecutionException, InterruptedException {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();
    
    ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
    //res.get -> blocking 으로 바뀌어버림 의미없음
    //callback이 실행되는거지 리턴을 주는게 아님; 응답을 받고 변형하려면? deferredReuslt!
    res.addCallback(s -> {
        dr.setResult(s.getBody() + "/work");  //응답 변형
    }, e-> {
        //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
        dr.setErrorResult(e.getMessage());
    });

    return dr;
}

중첩 콜을 원한다면..

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) throws ExecutionException, InterruptedException {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();
    ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
    res.addCallback(s -> {
        ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
        f2.addCallback(s2-> {
            dr.setResult(s2.getBody());
        }, e2-> {
            dr.setErrorResult(e2.getMessage());
        });
    }, e-> {
        //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
        dr.setErrorResult(e.getMessage());
    });
    return dr;
}

2초짜리 콜을 2번 중첩해서(/service2는 /service의 카피본이다) 4초가량이 걸린다. (하지만 역시 나는 위와 같은 현상이 계속되어서 안되었다..)

 

3번 중첩한다면..

@Slf4j
@EnableAsync
@SpringBootApplication
public class Application {
    @RestController
    public static class MyController{
        //Netty4ClientHttpRequestFactory spring 제공
        //netty도 스레드 1개
        AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

        @Autowired
        MyService myService;

        static final String URL1 = "http://localhost:8081/service?req={req}";
        static final String URL2 = "http://localhost:8081/service2?req={req}";

        //thread 1개
        @GetMapping("/rest")
        public DeferredResult<String> rest(int idx) {
            log.info("/rest {}", idx);
            DeferredResult<String> dr= new DeferredResult<>();

            ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
            f1.addCallback(s -> {
                ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
                f2.addCallback(s2 -> {
                    ListenableFuture<String> f3 = myService.work(s2.getBody());
                    f3.addCallback(s3 -> {
                        dr.setResult(s3);
                    }, e3 -> {
                        dr.setErrorResult(e3.getMessage());
                            }
                    );
                }, e2-> {
                    dr.setErrorResult(e2.getMessage());
                });
            }, e-> {
                dr.setErrorResult(e.getMessage());
            });
            return dr;
        }
    }

    @Service
    public static class MyService{
        @Async
        public ListenableFuture<String> work(String req){
            return new AsyncResult<String>(req + "/asyncwork");
        }
    }

    @Bean
    ThreadPoolTaskExecutor myPool(){
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1); //queue까지 다 차면 맥스
        te.initialize();
        return te;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

주의해야 하는 것은 어디서든 다 async, 비동기가 되게끔 작성해야 한다는 것, 아니면 순식간에 동기로 바뀐다.

 

 

728x90
반응형
반응형

이전 글: 2022.03.16 - [개발/reactive] - [reactive] 4-1. java Future/FutureTask/Callable/Runnable

 

[reactive] 4-1. java Future/FutureTask/Callable/Runnable

callable vs runnable interface Runnable interface Callable interface java version package java 1.0 ~ java.lang java 1.5 ~ java.util.concurrent return 계산 결과를 받을 수 없음 계산 결과 받을 수 있음(..

bangpurin.tistory.com

이전 글과 오늘 글은 아래 유튜브를 보고 작성하였다.

 

1. using Spring Async

@Slf4j
@EnableAsync  //있어야 async 작동
@SpringBootApplication
public class Spring {

    @Component
    public static class MyService{
        //future인데 등록해서 받아볼 수 있게하는, 스프링이 만든 ListenableFuture
        //요청할 때 마다 스레드를 계속 만들어 비효율적인 simpleAsyncTaskExecutor -> 빈으로 풀 등록해서 쓰자
        //비동기가 여러개고 스레드 풀을 분리해서 지정하고 싶으면 Async("name") 지정 가능
        @Async
        public ListenableFuture<String> hello() throws InterruptedException {
            log.info("hello()");
            Thread.sleep(2_000);
            return new AsyncResult<>("hello");
        }
    }

    //task executor를 빈으로 해두면 알아서 가져가서 쓰게 되어있음
    @Bean
    ThreadPoolTaskExecutor threadPoolExecutor(){
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(10);//기본으로 만드는거; 첫번째 스레드 요청이 올 때; jmx를 통해 런타임에도 값 수정이 가능
        te.setQueueCapacity(200);//기본 풀이 다 차면 큐에 먼저 차
        te.setMaxPoolSize(100);//큐가 꽉 찼을 때 에러나는데 그걸 막아주려고 하는 설정
        te.setThreadNamePrefix("async-");
        te.initialize();
        return te;
    }

    public static void main(String[] args) {
        //try with resource로 실행하면 작업 후 알아서 종료
        try(ConfigurableApplicationContext c = SpringApplication.run(Spring.class, args)){

        }
    }

    @Autowired MyService myService;

    //모든 빈 등록 마치면 바로 실행 됨
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> f = myService.hello();
            f.addCallback(s -> System.out.println(s), e -> System.out.println(e.getMessage()));
            log.info("Exit");
        };
    }
}

여기서 주의할 점.

ThreadPoolTaskExecutor에 max pool size에 설정하는 값은 큐가 다 차고 나서 늘어가는 풀 사이즈이다.

https://www.baeldung.com/java-threadpooltaskexecutor-core-vs-max-poolsize

 

참고로 HttpServletRequest/Response -> 자바의 InputStream 을 구현한 것으로, blocking방식이다.


2. using Spring Web - default

우선 서버 설정 값

#서블릿 스레드
server.tomcat.threads.max=1
#작업스레드풀?
spring.task.execution.pool.core-size=100
@Slf4j
public class LoadTest {
    static AtomicInteger cnt = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(100);

        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/callable";

        StopWatch sw = new StopWatch();
        sw.start();

        for(int i = 0; i <100; i++){
            es.execute((() -> {
                int idx = cnt.addAndGet(1);
                log.info("thread {}", idx);
                StopWatch s1 = new StopWatch();
                s1.start();
                rt.getForObject(url, String.class);
                s1.stop();
                log.info("elapsed:{} -> {}", idx, s1.getTotalTimeSeconds());
            }));
        }
     
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);

        sw.stop();
        log.info("total time {}", sw.getTotalTimeSeconds());
    }
}
@RestController
public static class MyController{
    @GetMapping("/callable")
    public String callable() throws InterruptedException {
        log.info("async");
        Thread.sleep(2_000);
        return "hello";
    }
}

위 요청은 아래 그림과 같이 표현된다.

load test는 100개의 스래드를 만들어 거의 동시에 요청했으나, 순차 처리로 인해 총 소요 시간은 200초가 넘는다.

...
13:04:56.428 [pool-1-thread-50] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.427 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.428 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.427 [pool-1-thread-90] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
...
13:08:04.993 [pool-1-thread-61] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:04.993 [pool-1-thread-61] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:04.994 [pool-1-thread-61] INFO toby.live.lecture4.LoadTest - elapsed:61 -> 188.612548654
13:08:06.999 [pool-1-thread-82] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:06.999 [pool-1-thread-82] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:06.999 [pool-1-thread-82] INFO toby.live.lecture4.LoadTest - elapsed:82 -> 190.618529907
13:08:09.002 [pool-1-thread-71] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:09.002 [pool-1-thread-71] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:09.003 [pool-1-thread-71] INFO toby.live.lecture4.LoadTest - elapsed:71 -> 192.622624014
13:08:11.009 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:11.009 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:11.009 [pool-1-thread-32] INFO toby.live.lecture4.LoadTest - elapsed:32 -> 194.629844982
13:08:13.012 [pool-1-thread-35] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:13.012 [pool-1-thread-35] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:13.013 [pool-1-thread-35] INFO toby.live.lecture4.LoadTest - elapsed:35 -> 196.631976988
13:08:15.015 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:15.015 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:15.016 [pool-1-thread-59] INFO toby.live.lecture4.LoadTest - elapsed:59 -> 198.634385976
13:08:17.018 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:17.018 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:17.019 [pool-1-thread-65] INFO toby.live.lecture4.LoadTest - elapsed:65 -> 200.638424197

/callable 쪽은 요청을 거의 동시에 100개의 요청을 받았지만 일반 동기 방식이므로 하나씩 처리한다.

...
2022-03-17 13:08:06.998  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:09.002  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:11.008  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:13.012  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:15.014  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async

 

3. callable

/callable 쪽만 소스를 수정한다.

@RestController
public static class MyController{
    @GetMapping("/callable")
    public Callable<String> callable() throws InterruptedException {
        log.info("callable");
        //Callable Interface 익명 클래스
        return () -> {
            log.info("async");
            Thread.sleep(2_000);
            return "hello";
        };
    }

callable은 요청을 받으면 별도의 스레드를 생성해 일을 병렬로 처리한다.

100개의 스래드가 거의 동시에 요청을 보내 100개의 작업 스래드가 받아서 동시에 처리하기 때문에 총 소요시간도 2초 남짓이다.

...
13:33:24.694 [pool-1-thread-63] INFO toby.live.lecture4.LoadTest - elapsed:63 -> 2.289299445
13:33:24.694 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 2.291355199
13:33:24.695 [pool-1-thread-67] INFO toby.live.lecture4.LoadTest - elapsed:67 -> 2.292713823
13:33:24.695 [pool-1-thread-81] INFO toby.live.lecture4.LoadTest - elapsed:80 -> 2.289499954
13:33:24.695 [pool-1-thread-3] INFO toby.live.lecture4.LoadTest - elapsed:3 -> 2.290040178
13:33:24.695 [pool-1-thread-57] INFO toby.live.lecture4.LoadTest - elapsed:57 -> 2.292708376
13:33:24.696 [pool-1-thread-44] INFO toby.live.lecture4.LoadTest - elapsed:44 -> 2.29307173
13:33:24.696 [main] INFO toby.live.lecture4.LoadTest - total time 2.307467369

/callable이 1개의 서블릿 스레드로 받았지만 별도의 작업 스래드로 바로 토스하여 일을 하는 모습..

...
2022-03-17 13:33:22.624  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.624  INFO 23081 --- [        task-98] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:33:22.625  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.625  INFO 23081 --- [        task-99] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:33:22.625  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.625  INFO 23081 --- [       task-100] toby.live.lecture4.SpringWeb             : async

 


4. DeferredResult

DeferredResult란 “지연된 결과”를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 비동기 작업이다. 

기존에 /callable api가 있던 곳에 아래와 같이 신규 api를 추가해준다.

Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();

@GetMapping("/dr")
public DeferredResult<String> dr(){
    log.info("dr");
    //DR은 setResult가 오기전 까지 응답을 끊지 않고 대기, 그래서 테스트 시 타임아웃 시간을 길게
    //결과가 오면 바로 return
    DeferredResult<String> dr = new DeferredResult<>(600_000L);
    results.add(dr);
    return dr;
}

@GetMapping("/dr/count")
public String drcount(){
    return String.valueOf(results.size());
}

@GetMapping("/dr/event")
public String drevent(String msg){
    for(DeferredResult<String> dr: results){
        dr.setResult("hello " + msg);
        results.remove(dr);
    }
    return "OK";
}

load test에 /dr을 쏘게끔 설정하고 실행하면 100개의 요청을 우르르 보내지만 아무 응답을 받지 않고 대기하는 모습을 보이는데

이때 /dr/count를 쏴보면 100개의 요청이 큐에 들어있는 것을 확인할 수 있다.

그 상태에서 /dr/event?msg=today 를 요청하면 그제야 100개의 응답이 한 번에 온다. setResult라는 이벤트를 주면 리턴하는 방식이다.

...
13:48:17.039 [pool-1-thread-98] INFO toby.live.lecture4.LoadTest - elapsed:98 -> 213.022709247
13:48:17.039 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 213.027111781
13:48:17.039 [pool-1-thread-16] INFO toby.live.lecture4.LoadTest - elapsed:16 -> 213.026715767
13:48:17.040 [pool-1-thread-1] INFO toby.live.lecture4.LoadTest - elapsed:1 -> 213.025866684

 

5. ResponseBodyEmitter

ResponseBodyEmitter는 하나의 요청에 대한 응답을 여러번에 나눠서 보낼 때 사용된다.

//여러번에 나눠서 데이터를 보낸다
@GetMapping("/emitter")
public ResponseBodyEmitter emitter(){
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    Executors.newSingleThreadExecutor().submit(() -> {
        try {
            for(int i =0; i<=50; i++){
                emitter.send("<p> stream " +i+ "</p>");
                Thread.sleep(100);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    return emitter;
}

/emitter를 브라우저로 실행하면 stream 문구가 100ms 마다 계속 뜨는 것을 볼 수 있다. 위에 상태 표시에도 로딩으로 되어 있어 계속 응답을 받아오고 있음을 알 수 있다.


참고

https://jongmin92.github.io/2019/03/31/Java/java-async-1/

 

자바와 스프링의 비동기 기술

해당 포스팅은 토비님의 토비의 봄 TV 8회 스프링 리액티브 프로그래밍 (4) 자바와 스프링의 비동기 기술 라이브 코딩을 보며 따라했던 실습 내용을 바탕으로 정리한 글입니다. 실습 코드들은 Inte

jongmin92.github.io

 

728x90
반응형
반응형

이전 글: 2022.01.28 - [개발/spring] - [actuator] git info를 health에 포함하기

 

[actuator] git info를 health에 포함하기

환경: java11, springboot2.6.2, gradle7.1 spring actuator? 스프링 부트 기반의 애플리케이션에서 제공하는 여러가지 정보를 쉽게 모니터링하게 하는 라이브러리 DB연결상태, disk space, rabbit, reddis, 커스..

bangpurin.tistory.com

 

이전에 프로젝트에 actuator를 적용하여 /health url을 통해 배포된 소스의 해쉬, 브랜치, 잘 떠있는지 유무 등을 알 수 있었다. 

스프링 actuator는 metrics를 통해 애플리케이션의 현 자원 상태(스레드 풀 상태, db 커넥션 상태, jvm gc상태 등)를 알 수 있게 하는데, 이를 주기적으로 요청하여 그 흐름을 관찰할 수도 있다. 특히 스프링에서 지원하는 prometheus를 추가적으로 설정하면 prometheus서버가 원하는 포맷으로 변환해주기 때문에 쉽게 prometheus서버와 연동이 가능하다.

 

오늘은 서비스 모니터링에 주로 쓰이는 prometheus를 추가적으로 설정해본다.

이전 글에서 설정한 그대로 작업 시작한다.

 

1. gradle build에 추가

//기존에 있었음
implementation 'org.springframework.boot:spring-boot-starter-actuator'
//actuator prometheus 추가
implementation("io.micrometer:micrometer-registry-prometheus")

 

2. application.properties 추가

//prometheus 추가
management.endpoints.web.exposure.include=health, shutdown, info, prometheus
//이미 있었음
management.endpoint.health.enabled=true
//추가
management.endpoint.prometheus.enabled=true

 

이렇게 하고 서버를 재시작하면 /actuator/prometheus 가 활성화된다.

/actuator/prometheus

 

이외 추가적인 정보에 대한 설정은 prometheus.yml을 생성하여 작성하면 된다.


참고

https://pyxispub.uzuki.live/?p=1810 

 

Spring Boot + Actuator + Micrometer로 Prometheus 연동하기

이제까지 블로그에서 Prometheus, Grafana 에 대해 여러 번 다룬 적이 있었다. Monitoring with cAdvisor + Prometheus + Grafana (https://pyxispub.uzuki.live/?p=1764)Alert with Grafana(https://pyxispub.uzuki.live/?p=1779) 두번째 글 까

pyxispub.uzuki.live

https://happycloud-lee.tistory.com/217

 

[SC11] Spring Boot Actuator 이란 ?

1. Spring Boot Actuator 이해 1) WHY? 각 마이크로서비스는 고유의 목적을 가지고 개발되고 운영됩니다. 하지만 모든 마이크로서비스에는 공통으로 요구되는 기능이 있습니다. 예를 들어 Health Check와 상

happycloud-lee.tistory.com

 

728x90
반응형
반응형

callable vs runnable interface

  Runnable interface Callable interface
java version
package
java 1.0 ~
java.lang
java 1.5 ~
java.util.concurrent
return 계산 결과를 받을 수 없음 계산 결과 받을 수 있음(a generic value V)
throw checked exception을 throw 불가 checked exception을 throw 가능
override method run() call()

https://www.geeksforgeeks.org/difference-between-callable-and-runnable-in-java/

 

Difference Between Callable and Runnable in Java - GeeksforGeeks

A Computer Science portal for geeks. It contains well written, well thought and well explained computer science and programming articles, quizzes and practice/competitive programming/company interview Questions.

www.geeksforgeeks.org

 

자바 비동기 맛보기

0. ExecutorService 특징

  • es.execute vs es.submit 
  • 시간이 오래 걸리는 작업을 할 때는 submit 추천. future object를 리턴하기 때문에 나중에 get으로 꺼낼 수 있음(get은 블로킹!)
  • 그냥 스레드의 병렬 처리만 필요하면 execute
  execute() submit()
accept Runnable Runnable and Callable
declared in Executor interface ExecutorService interface
return void a Future object
  • es.shutdown 은 graceful shutdown이라서 할 거 다 할 때까지 기다림
  • es.shutdownNow 가 강제 종료
  • es.awaitTermination 은 타임아웃을 기다리는 용도로 실제 shutdown 시키지는 않음
  • 따라서 shutdown 먼저 걸로 awaitTermination을 써야 함

 

1. Future class

public static void main(String[] args) throws InterruptedException, ExecutionException {
    //스래드를 새로 만들고 반납하는 것이 비용이 많이 들어가니, 풀이 나옴
    //맥시멈 제한없고 처음에 스레드 없음 요청할 때 새로 만들고 다음 요청 시 기존 만든 스레드 실행
    ExecutorService es = Executors.newCachedThreadPool();
    //es.execute -> void

    //Future 비동기 핸들러지 결과물은 아님
    Future<String> f = es.submit(() -> {
        Thread.sleep(5_000);
        log.info("helloo");
        return "hello";
    });
    //f.isDone() -> non blocking method;
    System.out.println(f.isDone());
    log.info("before");
    //f.get -> blocking method
    System.out.println(f.get()); //비동기가 완성될 때 까지 블로킹(20초 기다림)
    log.info("exit");//맨 마지막
    System.out.println(f.isDone());
}

 

2. FutureTask class

Future 자체를 object로 만들어줌

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    FutureTask<String> f = new FutureTask<String>(() -> {
       Thread.sleep(3_000);
       log.info("async");
       return "hello";
    }){//익명클래스
        @Override
        protected void done() { //다 하면 이거해; f.get 할 필요없음
            try {
                System.out.println(get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    };

    es.execute(f);
    System.out.println(f.isDone());
    //System.out.println(f.get());
    es.shutdown(); //다 끝나면 종료
}

 

3. FutureTask 확장

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    CallbackFutureTask f = new CallbackFutureTask(() -> {
        Thread.sleep(3_000);
        if(1 == 1){
            throw new RuntimeException(">async error");
        }
        log.info("async");
        return "hello";
    }, s -> System.out.println("success:" + s)
    , e -> System.out.println(">error: " + e.getMessage())
    );

    es.execute(f);
    es.shutdown(); //다 끝나면 종료
}

interface SuccessCallback{
    void onSuccess(String result);
}

interface ExceptionCallback{
    void onError(Throwable t);
}

public static class CallbackFutureTask extends FutureTask<String>{
    SuccessCallback sc;
    ExceptionCallback ec;

    public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
        super(callable);
        //if(sc == null) throw null; //null point exception
        this.sc = Objects.requireNonNull(sc); //if null -> NPE
        this.ec = Objects.requireNonNull(ec);
    }

    @Override
    protected void done() {
        try {
            sc.onSuccess(get());
        } catch (InterruptedException e) { //종료 시그널
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) { // 찐에러
            ec.onError(e.getCause());
        }
    }
}
728x90
반응형
반응형

한 트랜젝션에서 서로 다른 DB에 update/insert 작업을 연속적으로 하게 될 때 이를 하나의 트랜젝션으로 보존할 수 있도록 스프링에서는 ChainedTransactionManager라는 클래스를 제공해주었다.

물론 이는 1, 2 ->  2, 1 구조로 완벽한 commit/rollback 로직은 아니었지만, 대부분/정상적인/예측 가능한 로직 상 사용 가능한 대안이었다.

 참고) 1, 2 ->  2, 1 구조란? 

위와 같은 구조로 2 커밋 후 1 롤백 시, 2가 롤백되지 않는 단점이 있다.

참고) 완벽한 로직이 아닌 이유: https://taes-k.github.io/2020/06/09/chained-transaction-manager/

 

ChainedTransactionManager 데이터소스 트랜잭션 연결하기

다중 데이터소스 트랜잭션 연결 요새는 MSA로 시스템을 구축하여 서버와 DB도 모두 각각 분리하여 마이크로하게 시스템 설계를 하는 추세이지만, 서비스에 따라 여러가지 이유로인해 여러개의 Da

taes-k.github.io

 

허나 springboot 2.6.2에서 사용하려고 봤더니 아래와 같이 deprecated 되었다고 나오는 것이 아닌가..

//deprecated since 2.5
@Deprecated
public class ChainedTransactionManager implements PlatformTransactionManager

 

그래서 대안이 뭔고 봤더니, TransactionSynchronization를 사용하라고 한다.

Instead of using ChainedTransactionManager for attaching callbacks to transaction commit (pre commit/post commit), either register a TransactionSynchronization to explicitly follow transaction cleanup with simplified semantics in case of exceptions.
See Also: TransactionSynchronization.beforeCommit(boolean), TransactionSynchronization.afterCommit()

 

저게 어떤 역할을 하는지, 어떻게 사용하는지는.. 좀 더 공부가 필요하다.

 


참고)

chainedTransactionConfig? https://techfinanceworld.com/?p=494 

 

Chained Transaction Manager in Spring

Suppose we have multiple databases to which we need to query under a spring transaction. So just a @transactional annotation against a single database transaction manager won't work. So for multi resource, we can use XA i.e; instead of multiple transaction

techfinanceworld.com

스프링 진영에서 없애려는 이유 https://github.com/spring-projects/spring-data-commons/issues/2232

 

Deprecate ChainedTransactionManager [DATACMNS-1817] · Issue #2232 · spring-projects/spring-data-commons

Mark Paluch opened DATACMNS-1817 and commented ChainedTransactionManager is the primary class in org.springframework.data.transaction that is used for multi-transactionmanager arrangements. It is u...

github.com

 

728x90
반응형
반응형

환경: springboot2.6.2 / java 11

로그 레벨을 DEBUG로 프로젝트를 실행하니 아래와 같은 에러가 빵빵 지나가는 것이 아닌가..

grpc를 사용하는 프로젝트라, grpc server가 내려가 있어서 발행하는 것이 아닌가 싶어서 기동 후 다시 봤는데도 마찬가지였다.

java.lang.UnsupportedOperationException: Reflective setAccessible(true) disabled
	at io.grpc.netty.shaded.io.netty.util.internal.ReflectionUtil.trySetAccessible(ReflectionUtil.java:31)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$4.run(PlatformDependent0.java:239)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0.<clinit>(PlatformDependent0.java:233)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.isAndroid(PlatformDependent.java:294)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.<clinit>(PlatformDependent.java:93)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:223)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:210)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.cached(AsciiString.java:1401)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<clinit>(AsciiString.java:48)
	at io.grpc.netty.shaded.io.grpc.netty.Utils.<clinit>(Utils.java:78)
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:82)
    
    ...
    
    
    
java.lang.IllegalAccessException: class io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$6 cannot access class jdk.internal.misc.Unsafe (in module java.base) because module java.base does not export jdk.internal.misc to unnamed module @61874b9d
	at java.base/jdk.internal.reflect.Reflection.newIllegalAccessException(Reflection.java:361)
	at java.base/java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:591)
	at java.base/java.lang.reflect.Method.invoke(Method.java:558)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$6.run(PlatformDependent0.java:353)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0.<clinit>(PlatformDependent0.java:344)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.isAndroid(PlatformDependent.java:294)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.<clinit>(PlatformDependent.java:93)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:223)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:210)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.cached(AsciiString.java:1401)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<clinit>(AsciiString.java:48)
	at io.grpc.netty.shaded.io.grpc.netty.Utils.<clinit>(Utils.java:78)
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:82)

 

원인: 자바 11부터 Object의 접근권한이 더이상 증가하지 않는다는 에러. 에러가 아예 안 나려면(정상 액션을 원한다면) 자바 8을 사용하라는데, 발전하는 IT 시대에 역행은 오버라고 생각한다.. 자바 11에 AccessibleObject라는 새로운 클래스가 생겼으니, 아마 관련 라이브러리들이 곧 수정하지 않을까 하는 생각..

https://stackoverflow.com/questions/60241857/java-lang-unsupportedoperationexception-reflective-setaccessibletrue-disabled

 

java.lang.UnsupportedOperationException: Reflective setAccessible(true) disabled

When I run my Ktor application with gradle run then I've got the following exception: 19:21:11.795 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default l...

stackoverflow.com

 

해결:

관련 에러에 대해 검색해보면 이것저것 많이 나오는데, 특히 뜰 때 옵션에 아래 등과 같은 옵션을 주라고 한다.

-Dio.netty.tryReflectionSetAccessible=true

설정해보았지만 여전히 에러같이 생긴 친구들이 지나가서 굉장히 신경 쓰인다..

사실 저 로그는 에러라기보다는 디버그에 가깝다(고 한다..). 그래서 그냥 해당 로그 레벨을 ERROR로 바꿔버렸다..

<logger name="io.grpc.netty" level="ERROR"/>

 

사실 에러(?)를 무시하는 격이라 좀 찝찝하긴 하지만, 아직까지는 더 나은 방안은 없어 보인다. 기다려보자.

728x90
반응형
반응형

세 번째 강의 시작한다.

 

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    pub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
0    [main] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [main] DEBUG toby.live.SchedulerEx  - onNext:1
2    [main] DEBUG toby.live.SchedulerEx  - onNext:2
2    [main] DEBUG toby.live.SchedulerEx  - onNext:3
2    [main] DEBUG toby.live.SchedulerEx  - onNext:4
2    [main] DEBUG toby.live.SchedulerEx  - onNext:5
2    [main] DEBUG toby.live.SchedulerEx  - onComplete
EXXXXIT

위 코드를 돌리면 아래와 같이 로그가 찍힌다. 보이는 것처럼 main 스레드가 모든 일을 다 하고 있고, 만약 중간에 블로킹이 걸리면 main스레드는 진행을 하지 못하게 된다. pub - sub 사이의 일은 다른 스레드가 하게끔 하고 main스레드는 지나치게 하려면 어떻게 할까

 

subscribeOn

  • 주는자(publisher)가 느리거나 예측 불가할 경우 & 처리(consumer)가 빠를 때
  • subscriber를 별도의 스레드로 분리
public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher<Integer> subOnPub = subscriber -> {
        ExecutorService es = Executors.newSingleThreadExecutor(); //1개 스레드, 몰리면 큐에 넣고 순차처리
        es.execute(()->{
            pub.subscribe(subscriber);
        });
    };

    subOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
EXXXXIT
0    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:1
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:2
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:3
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:4
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:5
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onComplete

아까랑 다르게 main 스레드에서 exit을 먼저 하고 나머지 스레드로 작업한다.

 

publishOn

  • 데이터 생성(publisher)은 빠른데 처리하는 곳(consumer)이 느릴 때 
  • publisher를 별도의 스레드로 분리
  • 데이터 생성은 메인에서 빨리해 근데 가져가는 건 느리니까 별도에서 진행해
public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                log.debug("subscription request");
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher pubOnPub = subscriber -> {
        pub.subscribe(new Flow.Subscriber<Integer>() {  //중개 operator
            ExecutorService es = Executors.newSingleThreadExecutor(); //코어 갯수가 하나라서 큐에 넣어서 안꼬임

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }
            // 별개의 스래드로 아래 처리하고 싶음
            @Override
            public void onNext(Integer item) {
                es.execute(() -> subscriber.onNext(item));
            }

            @Override
            public void onError(Throwable throwable) {
                es.execute(() -> subscriber.onError(throwable));
            }

            @Override
            public void onComplete() {
                es.execute(() -> subscriber.onComplete());
            }
        });
    };

    pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
0    [main] DEBUG toby.live.SchedulerEx  - onSubscribe
1    [main] DEBUG toby.live.SchedulerEx  - subscription request
EXXXXIT
4    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:1
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:2
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:3
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:4
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:5
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onComplete

이 친구는 onSubscribe, exit까지 main 스레드에서 해버리고 나머지를 할당한 별도의 스레드에서 진행한다.

 

publishOn / subscribeOn 개념을 둘 다 사용할 수도 있다.

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                log.debug("subscription request");
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher<Integer> subOnPub = subscriber -> {
        ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){ //1개 스레드, 몰리면 큐에 넣고 순차처리
            @Override
            public String getThreadNamePrefix() {
                return "subOn-";
            }
        }); 
        es.execute(()->{
            pub.subscribe(subscriber);
        });
    };

    Flow.Publisher pubOnPub = subscriber -> {
        subOnPub.subscribe(new Flow.Subscriber<Integer>() {  //중개 operator
            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                @Override
                public String getThreadNamePrefix() {
                    return "pubOn-";
                }
            });

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }
            // 별개의 스래드로 아래 처리하고 싶음
            @Override
            public void onNext(Integer item) {
                es.execute(() -> subscriber.onNext(item));
            }

            @Override
            public void onError(Throwable throwable) {
                es.execute(() -> subscriber.onError(throwable));
            }

            @Override
            public void onComplete() {
                es.execute(() -> subscriber.onComplete());
            }
        });
    };

    pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
EXXXXIT
0    [subOn-1] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [subOn-1] DEBUG toby.live.SchedulerEx  - subscription request
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:1
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:2
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:3
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:4
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:5
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onComplete

각 스레드에 이름을 주어 로그를 확인해봤더니, 메인은 메인대로 진행하였고(exxit) onSubscribe, request 까지는 subOn 스레드에서 진행하였고 나머지는 pubOn 스레드에서 진행함을 알 수 있다.

 

원리는 이렇고, flux로 구현하면 아래와 같다. 상황에 따라 publishOn/subscribeOn을 하나만 쓸 수도, 모두 쓸 수도 있겠다.

public static void main(String[] args) {
    Flux.range(1, 10) //1부터 10개
        .publishOn(Schedulers.newSingle("pub"))
        .log()
        .subscribeOn(Schedulers.newSingle("sub"))
        .subscribe(System.out::println); //onNext
    System.out.println("exit");
}
exit
32   [sub-1] INFO  reactor.Flux.PublishOn.1  - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
33   [sub-1] INFO  reactor.Flux.PublishOn.1  - | request(unbounded)
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(1)
1
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(2)
2
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(3)
3
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(4)
4
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(5)
5
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(6)
6
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(7)
7
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(8)
8
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(9)
9
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(10)
10
35   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onComplete()

 

flux.interval

메인 스레드 종료 후 별도 스레드에서 200ms마다 하나씩 숫자를 받아서 실행. 5초 지나면 종료되는 코드 작성.

public static void main(String[] args) throws InterruptedException {

    //user thread 하나라도 있으면 강종안함
    //daemon thread 만 남으면 강종
    Flux.interval(Duration.ofMillis(200))//주기를 가지고 숫자를 증가; 무한으로
        .take(10) //10개 받으면 끝; 중지시킨다
        .subscribe(s -> log.debug("onNext::{}", s));

    log.debug("Exit");
    TimeUnit.SECONDS.sleep(5); //그래서 유저 스래드를 만들어줘야 실행됨
}
15   [main] DEBUG toby.live.FluxScEx  - Exit
223  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::0
419  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::1
619  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::2
818  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::3
1015 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::4
1219 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::5
1419 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::6
1618 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::7
1817 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::8
2019 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::9

 

이를 reactor stream 없이 구현한다면?

newSingleThreadScheduledExecutor.scheduleAtFixedRate 이란 함수와 cancel을 복합 구현하여 만들면 된다.

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = sub -> {
        sub.onSubscribe(new Flow.Subscription() {
            int no = 0;
            boolean cancelled = false;

            @Override
            public void request(long n) {
                ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();

                exec.scheduleAtFixedRate(() -> {  //interval
                    if(cancelled){
                        exec.shutdown();
                        return;
                    }
                    sub.onNext(no++);
                }, 0, 300, TimeUnit.MILLISECONDS); //죽을 때 까지 계속 저 주기로 날려
            }

            @Override
            public void cancel() {
                log.debug("cancel called");
                cancelled = true;
            }
        });
    };

    Flow.Publisher<Integer> takePub = sub -> {
        pub.subscribe(new Flow.Subscriber<Integer>() {
            int count = 0;
            Flow.Subscription subs;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                sub.onSubscribe(subscription);
                subs = subscription;
            }

            @Override
            public void onNext(Integer item) {
                sub.onNext(item);
                if(++count >= 5){
                    subs.cancel();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                sub.onError(throwable);
            }

            @Override
            public void onComplete() {
                sub.onComplete();
            }
        });
    };

    takePub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
}
0    [main] DEBUG toby.live.IntervalEx  - onSubscribe
8    [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:0
309  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:1
607  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:2
908  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:3
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:4
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx  - cancel called
728x90
반응형
반응형

이전 글: 2022.02.10 - [개발/spring] - [개념] interceptor vs filter 그리고 ContentCachingRequestWrapper

 

[개념] interceptor vs filter 그리고 ContentCachingRequestWrapper

매번 보고 또 봐도 까먹고 또 까먹고 헷갈리고 당황하는 것이 이 쪽 친구들인 것 같다. interceptor, filter, aop, servlet, servlet container, dispatcher servlet... 반복학습을 하면 할수록 점점 마스터할 수..

bangpurin.tistory.com

 

이전에 filter와 interceptor의 차이를 보았는데, 오늘은 filter와 oncePerRequestFilter의 차이를 보려고 한다. 그런데 이게 필턴데 스프링 컨텍스트 안에서 작용한다고? 그럼 또 interceptor를 놓칠 수 없다..

OncePerRequestFilter

OncePerRequestFilter는 Spring Framework에서 제공하는 필터로, Spring의 서블릿 컨테이너에서 동작한다. 이는 Spring의 일반적인 필터 체인에서 동작하는데, Spring이 제공하는 기능이기 때문에 Spring 이외의 서블릿 컨테이너에서는 자동으로 동작하지 않는다. 또한 extend로 필터를 구현하며 @Component로 등록되어 있어야 한다.

A Filter can be called either before or after servlet execution. When a request is dispatched to a servlet, the RequestDispatcher may forward it to another servlet. There's a possibility that the other servlet also has the same filter. In such scenarios, the same filter gets invoked multiple times.

Spring guarantees that the OncePerRequestFilter is executed only once for a given request.

spring은 filter에서 spring config 설정 정보를 쉽게 처리하기 위한 GenericFilterBean을 제공한다.

사용자는 요청을 한 번 보냈지만 redirect나 내부적인 이슈로 한 요청이 여러 번의 요청을 낳게 될 수 있다. 이 경우 인증, 로깅 등 필터가 (의도치 않게) 중첩적으로 호출되는데 이를 방지하고 하는 필터가 OncePerRequestFilter이다. OncePerRequestFilter 역시 GenericFilterBean상속받아 구현되어 있다. 즉 스프링이 제어한다는 것이고, 스프링은 OncePerRequestFilter가 주어진 요청에 대해 단 한 번만 수행되는 것을 보장한다.

OncePerRequestFilter는 다음과 같이 동작합니다:

  • 서블릿 컨테이너에서 Spring 필터 체인이 실행될 때, OncePerRequestFilter는 요청을 가로채서 처리
  • 필터 체인을 통해 요청을 전달할 수 있으며, 필터 체인 내에서 다른 필터들이 중복 실행되지 않도록 보장
  • 필터 체인에서 중복 실행 방지가 필요한 경우에 사용. 일반적인 javax.servlet.Filter는 요청이 FORWARD 또는 INCLUDE로 서블릿 내부에서 재처리될 때 여러 번 호출될 수 있지만, OncePerRequestFilter는 각 요청당 한 번만 실행됩니다.

 

실제 동작 흐름:

아무 설정을 하지 않아도 Spring Boot는 Spring 필터(Specifically, Spring Security 등)를 서블릿 컨테이너 필터보다 먼저 실행되도록 자동으로 설정합니다. 이는 Spring Boot의 자동 구성(Autoconfiguration) 기능과 내부 필터 체인 관리 방식 덕분입니다. Spring Boot는 애플리케이션 시작 시 자동으로 Spring 필터들을 서블릿 컨테이너에서 관리되는 필터보다 우선적으로 실행되도록 설정합니다.

  1. 서블릿 컨테이너가 요청을 받습니다.
  2. Spring Boot는 Spring 필터 체인을 먼저 실행하도록 설정합니다.
  3. Spring에서 관리하는 필터(예: OncePerRequestFilter)가 먼저 실행됩니다.
  4. 그 후, 서블릿 컨테이너에 등록된 일반 필터(javax.servlet.Filter)가 실행됩니다.

once-filter -> filter -> interceptor -> controller -> interceptor -> filter -> once-filter

2024-10-08 15:35:41 INFO  [c.n.a.f.CustomServletWrappingFilter     .doFilterInternal    :  24] [once-filter][REQUEST] [GET] /api/asdf
2024-10-08 15:35:41 INFO  [c.n.aapoker.filter.RequestLogFilter     .doFilter            :  34] [filter][REQUEST] [GET] /api/asdf
2024-10-08 15:35:41 INFO  [c.n.aapoker.filter.RequestInterceptor   .preHandle           :  21] [interceptor][REQUEST] [GET] /api/asdf
2024-10-08 15:35:41 INFO  [c.n.aapoker.filter.RequestInterceptor   .postHandle          :  39] [interceptor][RESPONSE] [GET] /api/asdf 404 - 0.003 ms
2024-10-08 15:35:41 INFO  [c.n.aapoker.filter.RequestLogFilter     .doFilter            :  43] [filter][RESPONSE] [GET] /api/asdf 404 - 0.01ms
2024-10-08 15:35:41 INFO  [c.n.a.f.CustomServletWrappingFilter     .doFilterInternal    :  32] [once-filter][RESPONSE] [GET] /api/asdf 404 - 0.011ms
2024-10-08 15:35:41 INFO  [c.n.aapoker.filter.RequestInterceptor   .preHandle           :  21] [interceptor][REQUEST] [GET] /error
2024-10-08 15:35:41 INFO  [c.n.aapoker.filter.RequestInterceptor   .postHandle          :  39] [interceptor][RESPONSE] [GET] /error 404 - 0.013 ms

에러 화면을 요청할 경우 필터와 onceRequestFilter는 한번씩만 실행되고, 인터셉터는 스프링 진영에서 자동으로 route되는 error화면까지 잡힌다.

onceRequestFilter도 스프링에서 관리되는 필터지만 error를 잡지 않는다는 것을 확인!


참고) ContentCachingRequestWrapper

ContentCachingRequestWrapper는 요청(Request) 본문을 캐싱할 수 있도록 도와주는 래퍼 클래스입니다. 기본적으로, 서블릿 요청의 본문은 한 번만 읽을 수 있는 스트림 형태로 제공됩니다. 하지만 특정 상황에서는 요청 본문을 여러 번 읽어야 하거나, 로깅 또는 분석 목적으로 요청 본문을 캐싱하고자 할 때가 있습니다.

ContentCachingRequestWrapper의 역할:

  • 요청 본문을 메모리에 캐싱하여, 한 번 이상 읽을 수 있도록 지원.
  • 요청 본문을 나중에 로그로 남기거나, 요청 데이터를 분석할 때 유용.
728x90
반응형

+ Recent posts