728x90
반응형
728x90
반응형
반응형

오늘은 5강을 이어서 학습한다.

 

1. default, blocking request

아래 그림처럼 2초씩 지연이 있는 /service를 요청하는 /rest를 100건 rest template로 요청한다. /rest의 스래드가 한 개라서 2초씩 대기한다.

@Slf4j
public class LoadTest {
    static AtomicInteger cnt = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);

        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";

        CyclicBarrier barrier = new CyclicBarrier(101);

        for(int i = 0; i <100; i++){
            es.submit(() -> {
                int idx = cnt.addAndGet(1);

                //blocking till await을 만난 숫자가 100에 도달할 때 까지 ; 그 순간에 블라킹이 한번에 풀려; 스레드 동기화 가능; 100개가 동시에 시작
                //throw exception하기 때문에 callable 구현 필요
                barrier.await();

                log.info("thread {}", idx);
                StopWatch s1 = new StopWatch();
                s1.start();

                String res = rt.getForObject(url, String.class, idx);

                s1.stop();
                log.info("elapsed:{} -> {} / {}", idx, s1.getTotalTimeSeconds(), res);
                return null; //callable 구현한 인터페이스라고 생각함
            });
        }

        barrier.await();

        StopWatch sw = new StopWatch();
        sw.start();

        //graceful shutdown이라서 먼저 걸고 기다림
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);

        sw.stop();
        log.info("total time {}", sw.getTotalTimeSeconds());
    }
}
@SpringBootApplication
public class Application {

    @RestController
    public static class MyController{
        RestTemplate rt = new RestTemplate();

        //thread 1개
        //2초씩 대기; 톰캣 큐에 대기
        @GetMapping("/rest")
        public String rest(int idx){
            //RestTemplate blocking; 응답올때까지 대기; 스레드가 더 필요로하는 요청이라면 홀딩
            String res = rt.getForObject("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
            return res;
        }
    }


    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

//설정
server.tomcat.threads.max=1
spring.task.execution.pool.core-size=100
@SpringBootApplication
public class RemoteService {

    @RestController
    public static class MyController{
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(2_000);
            return req + "/service";
        }
    }

    public static void main(String[] args) {
    	//application.properties 오버라이드하는 설정
        System.setProperty("server.port", "8081");
        System.setProperty("server.tomcat.threads.max", "1000");
        SpringApplication.run(RemoteService.class, args);
    }
}
...
12:23:06.118 [pool-1-thread-39] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:06.119 [pool-1-thread-39] INFO toby.live.lecture4.LoadTest - elapsed:39 -> 8.463001595 / hello39/service
12:23:08.127 [pool-1-thread-30] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:23:08.127 [pool-1-thread-30] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:08.127 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 10.472331585 / hello30/service
12:23:10.139 [pool-1-thread-34] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:23:10.139 [pool-1-thread-34] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:23:10.140 [pool-1-thread-34] INFO toby.live.lecture4.LoadTest - elapsed:34 -> 12.485099461 / hello34/service
...

2초 이상이 걸리는 100건의 작업이고 응답을 기다리는 동안은 블로킹되기 때문에 총 200초가량 소요된다. 허나 복잡한 로직도 아니고, 사실 cpu는 놀고 있을 것이다.. 이를 비동기로 바꿔보자.

 

2. AsyncRestTemplate(spring5.0/springboot2.0에서 deprecated)

위 소스에서 /rest의 rest template만 바꿔본다.

@RestController
public static class MyController{
    AsyncRestTemplate rt = new AsyncRestTemplate();

    //thread 1개
    @GetMapping("/rest")
    public ListenableFuture<ResponseEntity<String>> rest(int idx){
        //호출을 비동기로; 바로 스레드 리턴, 응답이 오면 콜백은 스프링이 알아서; 응답이 오면 이 컨트롤러의 응답이라 생각하고 처리함; 콜백을 처리할 필요 없음
        //비동기를 호출하기 위해 스래드를 백그라운드에 미리 만들어; 자바의 api를 사용해서; 겉으로는 한 스레드에서 처리한 것 처럼 보이지만 사실 서버의 자원을 사용한 것
        ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
        return res;
    }
}
...
12:42:49.442 [pool-1-thread-54] INFO toby.live.lecture4.LoadTest - elapsed:54 -> 2.516070823 / hello54/service
12:42:49.444 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:42:49.444 [pool-1-thread-12] INFO toby.live.lecture4.LoadTest - elapsed:12 -> 2.513689601 / hello12/service
12:42:49.444 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:42:49.445 [pool-1-thread-89] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
12:42:49.445 [pool-1-thread-89] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
12:42:49.445 [pool-1-thread-98] INFO toby.live.lecture4.LoadTest - elapsed:98 -> 2.518126106 / hello98/service
12:42:49.445 [pool-1-thread-89] INFO toby.live.lecture4.LoadTest - elapsed:89 -> 2.515622072 / hello89/service
12:42:49.446 [main] INFO toby.live.lecture4.LoadTest - total time 2.527531376

이렇게 실행하면 총 실행시간이 2초 남짓으로 마치 요청 한 건을 한 것과 비슷하게 나온다. 하지만 자바에서 제공하는 분석 툴인 jmc를 이용하여 /rest를 실행한 애플리케이션의 thread의 개수를 확인해보면..

서블랫 스래드는 하나만 만들어져야 하는데, 사실 뒤에서는 100개 이상의 스레드를 추가적으로 생성해서 요청하고 있음을 알 수 있다. 톰캣이 100개의 스레드를 만들었나 싶은데, 사실 톰캣 스래드는 사진에서 보는 것과 같이 하나고, 자바 api를 통해 백그라운드에서 100개 스레드를 만들어 놓고 실행하는 것이다. 즉 매번 서버 자원을 사용한다는 것인데, 이는 비효율적이다. 최소한의 자원을 사용해야 한다.

 

3. netty / nonblocking io를 사용하는 client lib 사용

/rest 부분의 rest template를 async rest template으로 변경하고 네티를 사용한다. 네티의 스레드를 1개로 설정한다.

@RestController
public static class MyController{
    //Netty4ClientHttpRequestFactory spring 제공
    //netty도 스레드 1개
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

    //thread 1개
    @GetMapping("/rest")
    public ListenableFuture<ResponseEntity<String>> rest(int idx) throws ExecutionException, InterruptedException {
        log.info("/rest {}", idx);
        //호출을 비동기로; 바로 스레드 리턴, 응답이 오면 콜백은 스프링이 알아서; 응답이 오면 이 컨트롤러의 응답이라 생각하고 처리함; 콜백을 처리할 필요 없음
        //비동기를 호출하기 위해 스래드를 백그라운드에 미리 만들어; 자바의 api를 사용해서; 겉으로는 한 스레드에서 처리한 것 처럼 보이지만 사실 서버의 자원을 사용한 것
        ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
        return res;
    }
}

그리고 실행을 하면 처음에는 3초가 걸리고, 다시 또 실행을 하면.. 8초가 걸리고..  다시 실행하면 timeout이 난다..

토비님과는 다른 결과.. 왜지ㅠ

...
15:13:45.147 [pool-1-thread-64] INFO toby.live.lecture4.LoadTest - elapsed:64 -> 3.230518378 / hello64/service
15:13:45.148 [pool-1-thread-82] INFO toby.live.lecture4.LoadTest - elapsed:82 -> 3.229926106 / hello82/service
15:13:45.148 [pool-1-thread-53] INFO toby.live.lecture4.LoadTest - elapsed:53 -> 3.229702047 / hello53/service
15:13:45.148 [pool-1-thread-62] INFO toby.live.lecture4.LoadTest - elapsed:62 -> 3.229505908 / hello62/service
15:13:45.149 [main] INFO toby.live.lecture4.LoadTest - total time 3.242286844

...
15:15:27.268 [pool-1-thread-60] INFO toby.live.lecture4.LoadTest - elapsed:60 -> 8.542415481 / hello60/service
15:15:27.268 [main] INFO toby.live.lecture4.LoadTest - total time 8.556960287

...
15:18:02.122 [pool-1-thread-26] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.124 [pool-1-thread-18] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.124 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.126 [pool-1-thread-12] DEBUG org.springframework.web.client.RestTemplate - Response 503 SERVICE_UNAVAILABLE
15:18:02.129 [main] INFO toby.live.lecture4.LoadTest - total time 30.97585281

 

타임아웃이 나는 현상을 지켜보니 아래와 같은 순서로 진행되었다.

  1. LoadTest -> /rest로 100건 요청 
  2. /rest에서 100건 요청받고 -> /service로 요청해야 하는데 /service 쪽 로그에 아무것도 안 찍힘
  3. 1분 뒤에 /rest에서 async request timeout exception 걸려서 503 return
  4. 그 후에 /service가 요청을 받아서 처리 하지만 /rest에서는 이미 exception 처리돼서 무쓸모

왜 /rest는 요청을 바로 받는데 /service로 바로 안 보내고 1분 후에 한 번에 보내는 걸까.. block 되어 있는 건가..

/rest가 1개의 스레드에서 도는데 어디선가 블라킹이 있어서 안 되는 것 같은데, 왠지 모르겠다. 느낌은 저 시대와 지금의 기본적인 풀 세팅 정보가 달라서 그럴 것 같다.

 

어쨌건 스레드를 100개씩이나 더 생성하지 않는 것은 확인했다.

 

자꾸 타임아웃이 나지만 우선은 진행한다.

ListenableFuture로 응답을 그대로 내리면 응답을 꺼내서 수정할 수가 없다. 이때 사용 가능한 것이 DeferredResult이다.

ListenableFuture에 콜백을 달고 DeferredResult에 setResult를 하면 응답을 수정할 수 있다

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) throws ExecutionException, InterruptedException {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();
    
    ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
    //res.get -> blocking 으로 바뀌어버림 의미없음
    //callback이 실행되는거지 리턴을 주는게 아님; 응답을 받고 변형하려면? deferredReuslt!
    res.addCallback(s -> {
        dr.setResult(s.getBody() + "/work");  //응답 변형
    }, e-> {
        //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
        dr.setErrorResult(e.getMessage());
    });

    return dr;
}

중첩 콜을 원한다면..

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) throws ExecutionException, InterruptedException {
    log.info("/rest {}", idx);
    DeferredResult<String> dr= new DeferredResult<>();
    ListenableFuture<ResponseEntity<String>> res = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
    res.addCallback(s -> {
        ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
        f2.addCallback(s2-> {
            dr.setResult(s2.getBody());
        }, e2-> {
            dr.setErrorResult(e2.getMessage());
        });
    }, e-> {
        //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
        dr.setErrorResult(e.getMessage());
    });
    return dr;
}

2초짜리 콜을 2번 중첩해서(/service2는 /service의 카피본이다) 4초가량이 걸린다. (하지만 역시 나는 위와 같은 현상이 계속되어서 안되었다..)

 

3번 중첩한다면..

@Slf4j
@EnableAsync
@SpringBootApplication
public class Application {
    @RestController
    public static class MyController{
        //Netty4ClientHttpRequestFactory spring 제공
        //netty도 스레드 1개
        AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

        @Autowired
        MyService myService;

        static final String URL1 = "http://localhost:8081/service?req={req}";
        static final String URL2 = "http://localhost:8081/service2?req={req}";

        //thread 1개
        @GetMapping("/rest")
        public DeferredResult<String> rest(int idx) {
            log.info("/rest {}", idx);
            DeferredResult<String> dr= new DeferredResult<>();

            ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
            f1.addCallback(s -> {
                ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
                f2.addCallback(s2 -> {
                    ListenableFuture<String> f3 = myService.work(s2.getBody());
                    f3.addCallback(s3 -> {
                        dr.setResult(s3);
                    }, e3 -> {
                        dr.setErrorResult(e3.getMessage());
                            }
                    );
                }, e2-> {
                    dr.setErrorResult(e2.getMessage());
                });
            }, e-> {
                dr.setErrorResult(e.getMessage());
            });
            return dr;
        }
    }

    @Service
    public static class MyService{
        @Async
        public ListenableFuture<String> work(String req){
            return new AsyncResult<String>(req + "/asyncwork");
        }
    }

    @Bean
    ThreadPoolTaskExecutor myPool(){
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1); //queue까지 다 차면 맥스
        te.initialize();
        return te;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

주의해야 하는 것은 어디서든 다 async, 비동기가 되게끔 작성해야 한다는 것, 아니면 순식간에 동기로 바뀐다.

 

 

728x90
반응형
반응형

이전 글: 2022.03.16 - [개발/reactive] - [reactive] 4-1. java Future/FutureTask/Callable/Runnable

 

[reactive] 4-1. java Future/FutureTask/Callable/Runnable

callable vs runnable interface Runnable interface Callable interface java version package java 1.0 ~ java.lang java 1.5 ~ java.util.concurrent return 계산 결과를 받을 수 없음 계산 결과 받을 수 있음(..

bangpurin.tistory.com

이전 글과 오늘 글은 아래 유튜브를 보고 작성하였다.

 

1. using Spring Async

@Slf4j
@EnableAsync  //있어야 async 작동
@SpringBootApplication
public class Spring {

    @Component
    public static class MyService{
        //future인데 등록해서 받아볼 수 있게하는, 스프링이 만든 ListenableFuture
        //요청할 때 마다 스레드를 계속 만들어 비효율적인 simpleAsyncTaskExecutor -> 빈으로 풀 등록해서 쓰자
        //비동기가 여러개고 스레드 풀을 분리해서 지정하고 싶으면 Async("name") 지정 가능
        @Async
        public ListenableFuture<String> hello() throws InterruptedException {
            log.info("hello()");
            Thread.sleep(2_000);
            return new AsyncResult<>("hello");
        }
    }

    //task executor를 빈으로 해두면 알아서 가져가서 쓰게 되어있음
    @Bean
    ThreadPoolTaskExecutor threadPoolExecutor(){
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(10);//기본으로 만드는거; 첫번째 스레드 요청이 올 때; jmx를 통해 런타임에도 값 수정이 가능
        te.setQueueCapacity(200);//기본 풀이 다 차면 큐에 먼저 차
        te.setMaxPoolSize(100);//큐가 꽉 찼을 때 에러나는데 그걸 막아주려고 하는 설정
        te.setThreadNamePrefix("async-");
        te.initialize();
        return te;
    }

    public static void main(String[] args) {
        //try with resource로 실행하면 작업 후 알아서 종료
        try(ConfigurableApplicationContext c = SpringApplication.run(Spring.class, args)){

        }
    }

    @Autowired MyService myService;

    //모든 빈 등록 마치면 바로 실행 됨
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> f = myService.hello();
            f.addCallback(s -> System.out.println(s), e -> System.out.println(e.getMessage()));
            log.info("Exit");
        };
    }
}

여기서 주의할 점.

ThreadPoolTaskExecutor에 max pool size에 설정하는 값은 큐가 다 차고 나서 늘어가는 풀 사이즈이다.

https://www.baeldung.com/java-threadpooltaskexecutor-core-vs-max-poolsize

 

참고로 HttpServletRequest/Response -> 자바의 InputStream 을 구현한 것으로, blocking방식이다.


2. using Spring Web - default

우선 서버 설정 값

#서블릿 스레드
server.tomcat.threads.max=1
#작업스레드풀?
spring.task.execution.pool.core-size=100
@Slf4j
public class LoadTest {
    static AtomicInteger cnt = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(100);

        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/callable";

        StopWatch sw = new StopWatch();
        sw.start();

        for(int i = 0; i <100; i++){
            es.execute((() -> {
                int idx = cnt.addAndGet(1);
                log.info("thread {}", idx);
                StopWatch s1 = new StopWatch();
                s1.start();
                rt.getForObject(url, String.class);
                s1.stop();
                log.info("elapsed:{} -> {}", idx, s1.getTotalTimeSeconds());
            }));
        }
     
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);

        sw.stop();
        log.info("total time {}", sw.getTotalTimeSeconds());
    }
}
@RestController
public static class MyController{
    @GetMapping("/callable")
    public String callable() throws InterruptedException {
        log.info("async");
        Thread.sleep(2_000);
        return "hello";
    }
}

위 요청은 아래 그림과 같이 표현된다.

load test는 100개의 스래드를 만들어 거의 동시에 요청했으나, 순차 처리로 인해 총 소요 시간은 200초가 넘는다.

...
13:04:56.428 [pool-1-thread-50] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.427 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.428 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.427 [pool-1-thread-90] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
...
13:08:04.993 [pool-1-thread-61] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:04.993 [pool-1-thread-61] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:04.994 [pool-1-thread-61] INFO toby.live.lecture4.LoadTest - elapsed:61 -> 188.612548654
13:08:06.999 [pool-1-thread-82] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:06.999 [pool-1-thread-82] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:06.999 [pool-1-thread-82] INFO toby.live.lecture4.LoadTest - elapsed:82 -> 190.618529907
13:08:09.002 [pool-1-thread-71] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:09.002 [pool-1-thread-71] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:09.003 [pool-1-thread-71] INFO toby.live.lecture4.LoadTest - elapsed:71 -> 192.622624014
13:08:11.009 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:11.009 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:11.009 [pool-1-thread-32] INFO toby.live.lecture4.LoadTest - elapsed:32 -> 194.629844982
13:08:13.012 [pool-1-thread-35] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:13.012 [pool-1-thread-35] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:13.013 [pool-1-thread-35] INFO toby.live.lecture4.LoadTest - elapsed:35 -> 196.631976988
13:08:15.015 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:15.015 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:15.016 [pool-1-thread-59] INFO toby.live.lecture4.LoadTest - elapsed:59 -> 198.634385976
13:08:17.018 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:17.018 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:17.019 [pool-1-thread-65] INFO toby.live.lecture4.LoadTest - elapsed:65 -> 200.638424197

/callable 쪽은 요청을 거의 동시에 100개의 요청을 받았지만 일반 동기 방식이므로 하나씩 처리한다.

...
2022-03-17 13:08:06.998  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:09.002  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:11.008  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:13.012  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:15.014  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async

 

3. callable

/callable 쪽만 소스를 수정한다.

@RestController
public static class MyController{
    @GetMapping("/callable")
    public Callable<String> callable() throws InterruptedException {
        log.info("callable");
        //Callable Interface 익명 클래스
        return () -> {
            log.info("async");
            Thread.sleep(2_000);
            return "hello";
        };
    }

callable은 요청을 받으면 별도의 스레드를 생성해 일을 병렬로 처리한다.

100개의 스래드가 거의 동시에 요청을 보내 100개의 작업 스래드가 받아서 동시에 처리하기 때문에 총 소요시간도 2초 남짓이다.

...
13:33:24.694 [pool-1-thread-63] INFO toby.live.lecture4.LoadTest - elapsed:63 -> 2.289299445
13:33:24.694 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 2.291355199
13:33:24.695 [pool-1-thread-67] INFO toby.live.lecture4.LoadTest - elapsed:67 -> 2.292713823
13:33:24.695 [pool-1-thread-81] INFO toby.live.lecture4.LoadTest - elapsed:80 -> 2.289499954
13:33:24.695 [pool-1-thread-3] INFO toby.live.lecture4.LoadTest - elapsed:3 -> 2.290040178
13:33:24.695 [pool-1-thread-57] INFO toby.live.lecture4.LoadTest - elapsed:57 -> 2.292708376
13:33:24.696 [pool-1-thread-44] INFO toby.live.lecture4.LoadTest - elapsed:44 -> 2.29307173
13:33:24.696 [main] INFO toby.live.lecture4.LoadTest - total time 2.307467369

/callable이 1개의 서블릿 스레드로 받았지만 별도의 작업 스래드로 바로 토스하여 일을 하는 모습..

...
2022-03-17 13:33:22.624  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.624  INFO 23081 --- [        task-98] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:33:22.625  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.625  INFO 23081 --- [        task-99] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:33:22.625  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.625  INFO 23081 --- [       task-100] toby.live.lecture4.SpringWeb             : async

 


4. DeferredResult

DeferredResult란 “지연된 결과”를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 비동기 작업이다. 

기존에 /callable api가 있던 곳에 아래와 같이 신규 api를 추가해준다.

Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();

@GetMapping("/dr")
public DeferredResult<String> dr(){
    log.info("dr");
    //DR은 setResult가 오기전 까지 응답을 끊지 않고 대기, 그래서 테스트 시 타임아웃 시간을 길게
    //결과가 오면 바로 return
    DeferredResult<String> dr = new DeferredResult<>(600_000L);
    results.add(dr);
    return dr;
}

@GetMapping("/dr/count")
public String drcount(){
    return String.valueOf(results.size());
}

@GetMapping("/dr/event")
public String drevent(String msg){
    for(DeferredResult<String> dr: results){
        dr.setResult("hello " + msg);
        results.remove(dr);
    }
    return "OK";
}

load test에 /dr을 쏘게끔 설정하고 실행하면 100개의 요청을 우르르 보내지만 아무 응답을 받지 않고 대기하는 모습을 보이는데

이때 /dr/count를 쏴보면 100개의 요청이 큐에 들어있는 것을 확인할 수 있다.

그 상태에서 /dr/event?msg=today 를 요청하면 그제야 100개의 응답이 한 번에 온다. setResult라는 이벤트를 주면 리턴하는 방식이다.

...
13:48:17.039 [pool-1-thread-98] INFO toby.live.lecture4.LoadTest - elapsed:98 -> 213.022709247
13:48:17.039 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 213.027111781
13:48:17.039 [pool-1-thread-16] INFO toby.live.lecture4.LoadTest - elapsed:16 -> 213.026715767
13:48:17.040 [pool-1-thread-1] INFO toby.live.lecture4.LoadTest - elapsed:1 -> 213.025866684

 

5. ResponseBodyEmitter

ResponseBodyEmitter는 하나의 요청에 대한 응답을 여러번에 나눠서 보낼 때 사용된다.

//여러번에 나눠서 데이터를 보낸다
@GetMapping("/emitter")
public ResponseBodyEmitter emitter(){
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    Executors.newSingleThreadExecutor().submit(() -> {
        try {
            for(int i =0; i<=50; i++){
                emitter.send("<p> stream " +i+ "</p>");
                Thread.sleep(100);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    return emitter;
}

/emitter를 브라우저로 실행하면 stream 문구가 100ms 마다 계속 뜨는 것을 볼 수 있다. 위에 상태 표시에도 로딩으로 되어 있어 계속 응답을 받아오고 있음을 알 수 있다.


참고

https://jongmin92.github.io/2019/03/31/Java/java-async-1/

 

자바와 스프링의 비동기 기술

해당 포스팅은 토비님의 토비의 봄 TV 8회 스프링 리액티브 프로그래밍 (4) 자바와 스프링의 비동기 기술 라이브 코딩을 보며 따라했던 실습 내용을 바탕으로 정리한 글입니다. 실습 코드들은 Inte

jongmin92.github.io

 

728x90
반응형
반응형

callable vs runnable interface

  Runnable interface Callable interface
java version
package
java 1.0 ~
java.lang
java 1.5 ~
java.util.concurrent
return 계산 결과를 받을 수 없음 계산 결과 받을 수 있음(a generic value V)
throw checked exception을 throw 불가 checked exception을 throw 가능
override method run() call()

https://www.geeksforgeeks.org/difference-between-callable-and-runnable-in-java/

 

Difference Between Callable and Runnable in Java - GeeksforGeeks

A Computer Science portal for geeks. It contains well written, well thought and well explained computer science and programming articles, quizzes and practice/competitive programming/company interview Questions.

www.geeksforgeeks.org

 

자바 비동기 맛보기

0. ExecutorService 특징

  • es.execute vs es.submit 
  • 시간이 오래 걸리는 작업을 할 때는 submit 추천. future object를 리턴하기 때문에 나중에 get으로 꺼낼 수 있음(get은 블로킹!)
  • 그냥 스레드의 병렬 처리만 필요하면 execute
  execute() submit()
accept Runnable Runnable and Callable
declared in Executor interface ExecutorService interface
return void a Future object
  • es.shutdown 은 graceful shutdown이라서 할 거 다 할 때까지 기다림
  • es.shutdownNow 가 강제 종료
  • es.awaitTermination 은 타임아웃을 기다리는 용도로 실제 shutdown 시키지는 않음
  • 따라서 shutdown 먼저 걸로 awaitTermination을 써야 함

 

1. Future class

public static void main(String[] args) throws InterruptedException, ExecutionException {
    //스래드를 새로 만들고 반납하는 것이 비용이 많이 들어가니, 풀이 나옴
    //맥시멈 제한없고 처음에 스레드 없음 요청할 때 새로 만들고 다음 요청 시 기존 만든 스레드 실행
    ExecutorService es = Executors.newCachedThreadPool();
    //es.execute -> void

    //Future 비동기 핸들러지 결과물은 아님
    Future<String> f = es.submit(() -> {
        Thread.sleep(5_000);
        log.info("helloo");
        return "hello";
    });
    //f.isDone() -> non blocking method;
    System.out.println(f.isDone());
    log.info("before");
    //f.get -> blocking method
    System.out.println(f.get()); //비동기가 완성될 때 까지 블로킹(20초 기다림)
    log.info("exit");//맨 마지막
    System.out.println(f.isDone());
}

 

2. FutureTask class

Future 자체를 object로 만들어줌

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    FutureTask<String> f = new FutureTask<String>(() -> {
       Thread.sleep(3_000);
       log.info("async");
       return "hello";
    }){//익명클래스
        @Override
        protected void done() { //다 하면 이거해; f.get 할 필요없음
            try {
                System.out.println(get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    };

    es.execute(f);
    System.out.println(f.isDone());
    //System.out.println(f.get());
    es.shutdown(); //다 끝나면 종료
}

 

3. FutureTask 확장

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    CallbackFutureTask f = new CallbackFutureTask(() -> {
        Thread.sleep(3_000);
        if(1 == 1){
            throw new RuntimeException(">async error");
        }
        log.info("async");
        return "hello";
    }, s -> System.out.println("success:" + s)
    , e -> System.out.println(">error: " + e.getMessage())
    );

    es.execute(f);
    es.shutdown(); //다 끝나면 종료
}

interface SuccessCallback{
    void onSuccess(String result);
}

interface ExceptionCallback{
    void onError(Throwable t);
}

public static class CallbackFutureTask extends FutureTask<String>{
    SuccessCallback sc;
    ExceptionCallback ec;

    public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
        super(callable);
        //if(sc == null) throw null; //null point exception
        this.sc = Objects.requireNonNull(sc); //if null -> NPE
        this.ec = Objects.requireNonNull(ec);
    }

    @Override
    protected void done() {
        try {
            sc.onSuccess(get());
        } catch (InterruptedException e) { //종료 시그널
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) { // 찐에러
            ec.onError(e.getCause());
        }
    }
}
728x90
반응형
반응형

세 번째 강의 시작한다.

 

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    pub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
0    [main] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [main] DEBUG toby.live.SchedulerEx  - onNext:1
2    [main] DEBUG toby.live.SchedulerEx  - onNext:2
2    [main] DEBUG toby.live.SchedulerEx  - onNext:3
2    [main] DEBUG toby.live.SchedulerEx  - onNext:4
2    [main] DEBUG toby.live.SchedulerEx  - onNext:5
2    [main] DEBUG toby.live.SchedulerEx  - onComplete
EXXXXIT

위 코드를 돌리면 아래와 같이 로그가 찍힌다. 보이는 것처럼 main 스레드가 모든 일을 다 하고 있고, 만약 중간에 블로킹이 걸리면 main스레드는 진행을 하지 못하게 된다. pub - sub 사이의 일은 다른 스레드가 하게끔 하고 main스레드는 지나치게 하려면 어떻게 할까

 

subscribeOn

  • 주는자(publisher)가 느리거나 예측 불가할 경우 & 처리(consumer)가 빠를 때
  • subscriber를 별도의 스레드로 분리
public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher<Integer> subOnPub = subscriber -> {
        ExecutorService es = Executors.newSingleThreadExecutor(); //1개 스레드, 몰리면 큐에 넣고 순차처리
        es.execute(()->{
            pub.subscribe(subscriber);
        });
    };

    subOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
EXXXXIT
0    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:1
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:2
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:3
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:4
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:5
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onComplete

아까랑 다르게 main 스레드에서 exit을 먼저 하고 나머지 스레드로 작업한다.

 

publishOn

  • 데이터 생성(publisher)은 빠른데 처리하는 곳(consumer)이 느릴 때 
  • publisher를 별도의 스레드로 분리
  • 데이터 생성은 메인에서 빨리해 근데 가져가는 건 느리니까 별도에서 진행해
public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                log.debug("subscription request");
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher pubOnPub = subscriber -> {
        pub.subscribe(new Flow.Subscriber<Integer>() {  //중개 operator
            ExecutorService es = Executors.newSingleThreadExecutor(); //코어 갯수가 하나라서 큐에 넣어서 안꼬임

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }
            // 별개의 스래드로 아래 처리하고 싶음
            @Override
            public void onNext(Integer item) {
                es.execute(() -> subscriber.onNext(item));
            }

            @Override
            public void onError(Throwable throwable) {
                es.execute(() -> subscriber.onError(throwable));
            }

            @Override
            public void onComplete() {
                es.execute(() -> subscriber.onComplete());
            }
        });
    };

    pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
0    [main] DEBUG toby.live.SchedulerEx  - onSubscribe
1    [main] DEBUG toby.live.SchedulerEx  - subscription request
EXXXXIT
4    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:1
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:2
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:3
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:4
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:5
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onComplete

이 친구는 onSubscribe, exit까지 main 스레드에서 해버리고 나머지를 할당한 별도의 스레드에서 진행한다.

 

publishOn / subscribeOn 개념을 둘 다 사용할 수도 있다.

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                log.debug("subscription request");
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher<Integer> subOnPub = subscriber -> {
        ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){ //1개 스레드, 몰리면 큐에 넣고 순차처리
            @Override
            public String getThreadNamePrefix() {
                return "subOn-";
            }
        }); 
        es.execute(()->{
            pub.subscribe(subscriber);
        });
    };

    Flow.Publisher pubOnPub = subscriber -> {
        subOnPub.subscribe(new Flow.Subscriber<Integer>() {  //중개 operator
            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                @Override
                public String getThreadNamePrefix() {
                    return "pubOn-";
                }
            });

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }
            // 별개의 스래드로 아래 처리하고 싶음
            @Override
            public void onNext(Integer item) {
                es.execute(() -> subscriber.onNext(item));
            }

            @Override
            public void onError(Throwable throwable) {
                es.execute(() -> subscriber.onError(throwable));
            }

            @Override
            public void onComplete() {
                es.execute(() -> subscriber.onComplete());
            }
        });
    };

    pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
EXXXXIT
0    [subOn-1] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [subOn-1] DEBUG toby.live.SchedulerEx  - subscription request
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:1
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:2
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:3
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:4
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:5
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onComplete

각 스레드에 이름을 주어 로그를 확인해봤더니, 메인은 메인대로 진행하였고(exxit) onSubscribe, request 까지는 subOn 스레드에서 진행하였고 나머지는 pubOn 스레드에서 진행함을 알 수 있다.

 

원리는 이렇고, flux로 구현하면 아래와 같다. 상황에 따라 publishOn/subscribeOn을 하나만 쓸 수도, 모두 쓸 수도 있겠다.

public static void main(String[] args) {
    Flux.range(1, 10) //1부터 10개
        .publishOn(Schedulers.newSingle("pub"))
        .log()
        .subscribeOn(Schedulers.newSingle("sub"))
        .subscribe(System.out::println); //onNext
    System.out.println("exit");
}
exit
32   [sub-1] INFO  reactor.Flux.PublishOn.1  - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
33   [sub-1] INFO  reactor.Flux.PublishOn.1  - | request(unbounded)
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(1)
1
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(2)
2
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(3)
3
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(4)
4
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(5)
5
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(6)
6
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(7)
7
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(8)
8
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(9)
9
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(10)
10
35   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onComplete()

 

flux.interval

메인 스레드 종료 후 별도 스레드에서 200ms마다 하나씩 숫자를 받아서 실행. 5초 지나면 종료되는 코드 작성.

public static void main(String[] args) throws InterruptedException {

    //user thread 하나라도 있으면 강종안함
    //daemon thread 만 남으면 강종
    Flux.interval(Duration.ofMillis(200))//주기를 가지고 숫자를 증가; 무한으로
        .take(10) //10개 받으면 끝; 중지시킨다
        .subscribe(s -> log.debug("onNext::{}", s));

    log.debug("Exit");
    TimeUnit.SECONDS.sleep(5); //그래서 유저 스래드를 만들어줘야 실행됨
}
15   [main] DEBUG toby.live.FluxScEx  - Exit
223  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::0
419  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::1
619  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::2
818  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::3
1015 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::4
1219 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::5
1419 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::6
1618 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::7
1817 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::8
2019 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::9

 

이를 reactor stream 없이 구현한다면?

newSingleThreadScheduledExecutor.scheduleAtFixedRate 이란 함수와 cancel을 복합 구현하여 만들면 된다.

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = sub -> {
        sub.onSubscribe(new Flow.Subscription() {
            int no = 0;
            boolean cancelled = false;

            @Override
            public void request(long n) {
                ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();

                exec.scheduleAtFixedRate(() -> {  //interval
                    if(cancelled){
                        exec.shutdown();
                        return;
                    }
                    sub.onNext(no++);
                }, 0, 300, TimeUnit.MILLISECONDS); //죽을 때 까지 계속 저 주기로 날려
            }

            @Override
            public void cancel() {
                log.debug("cancel called");
                cancelled = true;
            }
        });
    };

    Flow.Publisher<Integer> takePub = sub -> {
        pub.subscribe(new Flow.Subscriber<Integer>() {
            int count = 0;
            Flow.Subscription subs;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                sub.onSubscribe(subscription);
                subs = subscription;
            }

            @Override
            public void onNext(Integer item) {
                sub.onNext(item);
                if(++count >= 5){
                    subs.cancel();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                sub.onError(throwable);
            }

            @Override
            public void onComplete() {
                sub.onComplete();
            }
        });
    };

    takePub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
}
0    [main] DEBUG toby.live.IntervalEx  - onSubscribe
8    [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:0
309  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:1
607  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:2
908  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:3
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:4
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx  - cancel called
728x90
반응형
반응형

오늘은 두 번째 강의이다.

 

operator라고 publisher와 subscriber사이에서 데이터 변형을 도와주는 것을 만들어본다.

publisher -> [data1] -> operator1 -> [data2] -> oper2 -> [data3] -> subscriber

이렇게 operator1은 publisher에게는 subscriber, oper2에게는 publisher의 역할을 하게 된다. 

 

1. map (1-1 mapping) 작성

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    Flow.Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10); //d2로 만든다
    Flow.Publisher<Integer> map2Pub = mapPub(mapPub, (Function<Integer, Integer>) s -> -s); //d3로 만든다
    map2Pub.subscribe(logSub());
}

private static Flow.Publisher<Integer> mapPub(Flow.Publisher<Integer> pub, Function<Integer, Integer> f) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) { //logSub
            pub.subscribe(new DelegateSub(subscriber) {
                @Override
                public void onNext(Integer item) {
                    subscriber.onNext(f.apply(item));
                }
            });
        }
    };
}

private static Flow.Subscriber<Integer> logSub() {
    return new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(Integer item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

private static Flow.Publisher<Integer> iterPub(List<Integer> iter) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            //반드시 호출해줘야 함, subscription = 구독이 담고 있는 액션
            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //갯수는 신경쓰지 말자
                    try{
                        iter.forEach(s -> subscriber.onNext(s));
                        subscriber.onComplete(); //다 보내면 완료를 알려줘야함
                    }catch (Throwable t){
                        subscriber.onError(t);
                    }
                }

                @Override
                public void cancel() {

                }
            });
        }
    };
}

위 코드에서 흐름은 아래와 같다. pub -> logSub로 데이터가 흘러가는걸 downstream이라고 하고 반대 방향으로 올라오는 것을 upstream이라고 부른다.

pub -> [data1] -> mapPub -> [data2] -> logSub
                        <- subscribe(logSub)
                        -> onSubscribe(s)
                        -> onNext
                        -> onNext
                        -> onComplete

일대일 매핑 방식이라 (1-> 10; 2-> 20; ...) mapSub의 onNext에서 매번 logSub의 onNext를 불러 데이터를 전송하는 것을 알 수 있다.

 

2. reduce 작성

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    //Flow.Publisher<Integer> sumPub = sumPub(pub); //합계
    //sumPub.subscribe(logSub());

    Flow.Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer, Integer, Integer>)(a, b) -> a+b ); //합계
    reducePub.subscribe(logSub());
}

//1, 2, 3, 4, 5
// 0 -> (0, 1) -> 1
// 1 -> (1, 2) -> 3
// 3 -> (3, 3) -> 6
// ...
private static Flow.Publisher<Integer> reducePub(Flow.Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            pub.subscribe(new DelegateSub(subscriber){
                int result = init;
                @Override
                public void onNext(Integer item) {
                    result = bf.apply(result, item);
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(result);
                    subscriber.onComplete();
                }
            });
        }
    };
}

private static Flow.Publisher<Integer> sumPub(Flow.Publisher<Integer> pub) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            pub.subscribe(new DelegateSub(subscriber){
                int sum = 0;
                @Override
                public void onNext(Integer item) {
                    sum += item;
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(sum);
                    subscriber.onComplete();
                }
            });
        }
    };
}

private static Flow.Subscriber<Integer> logSub() {
    return new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(Integer item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

private static Flow.Publisher<Integer> iterPub(List<Integer> iter) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            //반드시 호출해줘야 함, subscription = 구독이 담고 있는 액션
            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //갯수는 신경쓰지 말자
                    try{
                        iter.forEach(s -> subscriber.onNext(s));
                        subscriber.onComplete(); //다 보내면 완료를 알려줘야함
                    }catch (Throwable t){
                        subscriber.onError(t);
                    }
                }

                @Override
                public void cancel() {

                }
            });
        }
    };
}

합계와 같이 n개의 데이터가 하나의 데이터로 흡수되어 내려갈 때에는 onNext에서는 필요한 계산을 하고 onComplete에서 부모의 onNext를 불러 데이터를 한 번만 전송해주고, onComplete를 불러 더 이상 데이터가 없음을 알려준다.

 

3. generic으로 변환

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    Flow.Publisher<String> mapPub = mapPub(pub, (Function<Integer, String>) s -> "[" +s+ "]"); //d2로 만든다
    mapPub.subscribe(logSub());

    //Flow.Publisher<String> reducePub = reducePub2(pub, "", (BiFunction<String, Integer, String>)(a, b) -> a + "-" + b );
    Flow.Publisher<StringBuilder> reducePub = reducePub2(pub, new StringBuilder(), (a, b) -> a .append(b+","));
    reducePub.subscribe(logSub());

}

//generic으로 전환
//publisher가 발행한 T(int) -> R(string)으로 전환해주는 매개체(pub이자 sub)
private static <T, R> Flow.Publisher<R> mapPub(Flow.Publisher<T> pub, Function<T, R> f) {
    return new Flow.Publisher<R>() {
        @Override
        public void subscribe(Flow.Subscriber<? super R> subscriber) { //logSub
            pub.subscribe(new DelegateSub<T, R>(subscriber) {
                @Override
                public void onNext(T item) {
                    subscriber.onNext(f.apply(item));
                }
            });
        }
    };
}

private static <T, R> Flow.Publisher<R> reducePub2(Flow.Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
    return new Flow.Publisher<R>() {
        @Override
        public void subscribe(Flow.Subscriber<? super R> subscriber) {
            pub.subscribe(new DelegateSub<T, R>(subscriber){
                R result = init;

                @Override
                public void onNext(T item) {
                    result = bf.apply(result, item);
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(result);
                    subscriber.onComplete();
                }
            });
        }
    };
}

//publisher가 발행한 숫자를 변환한 최종 값(string)을 찍기만 하는 subscriber
private static <T> Flow.Subscriber<T> logSub() {
    return new Flow.Subscriber<T>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(T item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

 

4. reactor 라이브러리 사용

위에서 손으로 짠 코드는 사실 reactor lib를 사용하면 손쉽게 작성할 수 있다.

implementation 'io.projectreactor:reactor-core:3.4.15'

위 dependency를 추가하고 아래와 같이 진행한다.

 

public static void main(String[] args){
    Flux.create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .subscribe(System.out::println);
}
//로그
0    [main] DEBUG reactor.util.Loggers  - Using Slf4j logging framework
15   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
16   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
18   [main] INFO  reactor.Flux.Create.1  - onNext(1)
1    //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onNext(2)
2    //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onNext(3)
3    //sysout 결과
19   [main] INFO  reactor.Flux.Create.1  - onComplete()

로그(log())를 같이 살펴보면 

  1. flux의 .subscribe를 만나면 타고타고 올라가서 onSubscribe를 먼저 실행하고
  2. request(Long.MAX) -> unbounded로 표현되어 있는 것이고
  3. onNext -> -> onComplete를 실행하는 것을 알 수 있다.

 

<map>

public static void main(String[] args){
    Flux.<Integer>create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .map(s -> s*10)
    .log()
    .subscribe(System.out::println);
}
14   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
15   [main] INFO  reactor.Flux.Map.2  - onSubscribe(FluxMap.MapSubscriber)
15   [main] INFO  reactor.Flux.Map.2  - request(unbounded)
15   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
17   [main] INFO  reactor.Flux.Create.1  - onNext(1)
17   [main] INFO  reactor.Flux.Map.2  - onNext(10)
10   // sysout 결과
17   [main] INFO  reactor.Flux.Create.1  - onNext(2)
17   [main] INFO  reactor.Flux.Map.2  - onNext(20)
20   //sysout 결과
17   [main] INFO  reactor.Flux.Create.1  - onNext(3)
17   [main] INFO  reactor.Flux.Map.2  - onNext(30)
30   //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onComplete()
18   [main] INFO  reactor.Flux.Map.2  - onComplete()

로그를 위아래 걸어줬더니 위와 같은 로그가 찍혔다. Flux.Create. 로그는 위 log(), Flux.Map. 로그는 아래 log()라고 보면 된다. 위에서는 1이 넘어갔지만 map operator를 건너서 10이 되어 넘어가는 것을 볼 수 있다.

 

<reduce>

public static void main(String[] args){
    Flux.<Integer>create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .map(s -> s*10)
    .log()
    .reduce(0, (a, b) -> a+b)
    .log()
    .subscribe(System.out::println);
}
45   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
46   [main] INFO  reactor.Flux.Map.2  - onSubscribe(FluxMap.MapSubscriber)
46   [main] INFO  reactor.Mono.ReduceSeed.3  - | onSubscribe([Fuseable] MonoReduceSeed.ReduceSeedSubscriber)
46   [main] INFO  reactor.Mono.ReduceSeed.3  - | request(unbounded)
46   [main] INFO  reactor.Flux.Map.2  - request(unbounded)
47   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
48   [main] INFO  reactor.Flux.Create.1  - onNext(1)
48   [main] INFO  reactor.Flux.Map.2  - onNext(10)
48   [main] INFO  reactor.Flux.Create.1  - onNext(2)
48   [main] INFO  reactor.Flux.Map.2  - onNext(20)
48   [main] INFO  reactor.Flux.Create.1  - onNext(3)
49   [main] INFO  reactor.Flux.Map.2  - onNext(30)
49   [main] INFO  reactor.Flux.Create.1  - onComplete()
49   [main] INFO  reactor.Flux.Map.2  - onComplete()
49   [main] INFO  reactor.Mono.ReduceSeed.3  - | onNext(60)
60   //sysout 결과
49   [main] INFO  reactor.Mono.ReduceSeed.3  - | onComplete()

reduce operator를 마지막에 추가하고 보면 매 숫자를 받아서(create) 10을 곱하고(map) onNext를 계속하다가 마지막에 오면(onComplete) reduce를 실행해서 최종적으로 60을 받는 것을 볼 수 있다.


위 1~3 과정을 거치고 reactor lib를 사용한 후 로그를 보니 왜 그렇게 로그가 지나가는지 이해하기가 쉬웠다. 왜 굳이 예전 방식으로 코딩을 하시는걸까 궁금했는데 기본적인 원리를 손으로 짜보고 나니까 흐름을 파악하는데 큰 도움이 되는 것 같다!

728x90
반응형
반응형

reactive programming과 관련하여 토비의 라이브 10강을 시청하고 요약정리하려고 한다.

 

1. Iterable vs Observable (duality)

  • Iterable(pull) 방식:   사용하고자 하는 사람이 가져다가 쓴다.
  • Iterable#next
public static void main(String[] args){
    Iterable<Integer> iter = () ->
            new Iterator<>() {
                int i = 0;
                final static int MAX = 10;

                public boolean hasNext() {
                    return i < MAX;
                }

                public Integer next() {
                    return ++i;
                }
            };

    for(Integer i : iter){ //for each
        System.out.println(i);
    }

    for(Iterator<Integer> it = iter.iterator(); it.hasNext();){
        System.out.println(it.next());  //pull 데이터를 꺼내자
    }
}
  • Observable(push) 방식: 데이터를 주면 그걸 받아서 처리하는 방법
  • Observable#notifyObservers
@SuppressWarnings("deprecation")
static class IntObservable extends Observable implements Runnable{
    @Override
    public void run() {
        for(int i=1; i<=10; i++){
            setChanged(); //데이터 바뀌었으니 가져가라고 설정
            notifyObservers(i); //push방식으로 옵저버에게 데이터를 주면
        }
    }
}

public static void main(String[] args){
    Observer ob = new Observer() {
        @Override
        public void update(Observable o, Object arg) {
            //받아
            System.out.println(Thread.currentThread().getName() + " " + arg);
        }
    };
    IntObservable io = new IntObservable();
    io.addObserver(ob); //리스너 등록

    //io.run(); //실행

    //옵저버 패턴은 별도의 스레드에서 작업하기 수월
    ExecutorService es = Executors.newSingleThreadExecutor(); //따로 할당받은 스레드에서
    es.execute(io); //run이 실행됨
    System.out.println(Thread.currentThread().getName() + " DONE"); //메인 스레드에서 실행
    es.shutdown(); //스레드 종료
}

추가) 위 내용에 대해 이해하기 위해서는 옵저버 패턴(Observer pattern)을 필수로 알아야 한다.

옵저버 패턴? 
객체의 상태 변화를 관찰하는 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여 상태 변화가 있을 때마다 메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 디자인 패턴이다.
한 객체의 상태가 바뀌면 그 객체에 의존하는 다른 객체들한테 연락이 가고, 자동으로 내용이 갱신되는 방식으로 일대다(one-to-many) 의존성을 정의한다.

위 내용에 맞게 스스로 관련 내용을 구현할 수 있지만 자바에서 제공하는 Observer 인터페이스와 Observable 클래스를 가지고 구현할 수도 있다.

Observable 클래스를 상속받은 게 "데이터 제공자" Observer 인터페이스를 구현한 게 "데이터 받는 자"이다.

참고로 Observable클래스는 Java9부터 deprecated라서 관련 경고를 없애기 위해 아래 어노테이션을 넣는다.

@SuppressWarnings("deprecation")

 

옵저버 패턴에서는 다음과 같은 이슈가 있다.

  1. complete? 마지막이 언제인지 알기 힘들다.
  2. error? 진행되다 익셉션이 나면? 전파되는 방식이나 에러는 어떻게 처리? fallback?  관련 아이디어가 패턴에 녹아져 있지 않다.

이에 나오게 된 사상이 reactive stream이다.

 

2. reactive stream(스펙)

reactive stream이란 non-blocking(논 블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙으로 아래와 같은 특징이 있다.

  1. 잠재적으로 무한한 양의 데이터 처리
  2. 순서대로 처리
  3. 데이터를 비동기적으로 전달
  4. backpressure를 이용한 데이터 흐름 제어

또한 4가지 컴포넌트로 구성되어 있는데 다음과 같다.

  1. publisher: 데이터 제공자(옵저버 패턴의observable)
    • Publisher.subscribe(Subscriber)
  2. subscriber: 데이터를 받아서 사용하는 쪽(옵저버 패턴의 observer)
    • onSubscribe, onNext ...
  3. subscription
  4. proessor

참고로 Java9에 추가된 Flow는 reactvie stream 스펙을 채택하여 사용하고 있다. reative stream 스펙에 대한 내용은 여기에서 확인하기로 하고 이제 소스를 살펴보자.

public static void main(String[] args) throws InterruptedException {
    Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
    ExecutorService es = Executors.newSingleThreadExecutor();

    Flow.Publisher p = new Flow.Publisher() {
        @Override
        public void subscribe(Flow.Subscriber subscriber) {
            Iterator<Integer> it = itr.iterator();

            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //몇 개줄까; 갯수만큼 받고 싶어
                    es.execute(() -> {
                    //스레스 한정 위반이라 n을 안에서 수정 못해, i를 별도로 설정
                        int i = 0;
                        try{
                            while(i++ < n){
                                if(it.hasNext()){
                                    subscriber.onNext(it.next());
                                }else {
                                    subscriber.onComplete();
                                    break;
                                }
                            }
                        }catch (RuntimeException e){
                            subscriber.onError(e);
                        }
                    });
                }

                @Override
                public void cancel() { //중간에 작업 취소 시킬 수 있음

                }
            });
        }
    };

    Flow.Subscriber<Integer> s = new Flow.Subscriber<Integer>() {
        Flow.Subscription subscription; //잠시 저장하자
        @Override
        public void onSubscribe(Flow.Subscription subscription) { //구독이 시작되었어
            System.out.println(Thread.currentThread().getName() + " > onSubscribe ");
            //request
            this.subscription = subscription;
            this.subscription.request(1); //Long.MAX = 다 내놔
        }

        @Override
        public void onNext(Integer item) { //update; 데이터 주면 받아서 처리해; 끌어와 데이터를
            System.out.println(Thread.currentThread().getName() + " > onNext " + item); //받을 준비 되었어
            //버퍼사이즈, cpu 등에 따라 별도로 조절가능
            this.subscription.request(1);//하나 줘
        }

        @Override
        public void onError(Throwable throwable) { //에러가 나면 에러를 나 줘; 재시도 등 처리가능; 구독 끝
            System.out.println(Thread.currentThread().getName() + " > onError " + throwable.getMessage());
        }

        @Override
        public void onComplete() { //다 완료되었어; 구독 끝
            System.out.println(Thread.currentThread().getName() + " > onComplete ");
        }
    };

    p.subscribe(s); //s가 p를 리슨
   // es.shutdown();
    es.awaitTermination(10, TimeUnit.SECONDS);
    es.shutdown();
}

소스 각 라인에 주석으로 설명을 달았으며 큰 흐름은 아래 그림을 보고 이해하면 된다.

reactive stream from line

여기서의 특징은 publisher와 subscriber 사이에 subscription이라는 데이터이자 중계자를 거쳐서 구독과 관련된 설정을 할 수 있다는 점이다.

 


reactive stream 표준 및 명세: https://www.reactive-streams.org/

 

https://www.reactive-streams.org/

Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols. JDK9 java

www.reactive-streams.org

reactive stream 설명: https://engineering.linecorp.com/en/blog/reactive-streams-armeria-1/

 

Let's Play with Reactive Streams on Armeria - Part 1 - LINE ENGINEERING

What is Reactive Streams? In this post, I'd like to introduce the basic concept of Reactive Streams, and how to use Reactive Streams with Armeria, the

engineering.linecorp.com

https://sabarada.tistory.com/98

 

[Java] Reactive Stream 이란?

reactive stream이란 non-blocking(넌블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙입니다. java의 RxJava, Spring5 Webflux의 Core에 있는 ProjectReactor 프로젝트 모두 해당..

sabarada.tistory.com

 

728x90
반응형

+ Recent posts