개발/reactive

[reactive] 3. reactive streams - schedulers

방푸린 2022. 3. 3. 17:44
반응형

세 번째 강의 시작한다.

 

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    pub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
0    [main] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [main] DEBUG toby.live.SchedulerEx  - onNext:1
2    [main] DEBUG toby.live.SchedulerEx  - onNext:2
2    [main] DEBUG toby.live.SchedulerEx  - onNext:3
2    [main] DEBUG toby.live.SchedulerEx  - onNext:4
2    [main] DEBUG toby.live.SchedulerEx  - onNext:5
2    [main] DEBUG toby.live.SchedulerEx  - onComplete
EXXXXIT

위 코드를 돌리면 아래와 같이 로그가 찍힌다. 보이는 것처럼 main 스레드가 모든 일을 다 하고 있고, 만약 중간에 블로킹이 걸리면 main스레드는 진행을 하지 못하게 된다. pub - sub 사이의 일은 다른 스레드가 하게끔 하고 main스레드는 지나치게 하려면 어떻게 할까

 

subscribeOn

  • 주는자(publisher)가 느리거나 예측 불가할 경우 & 처리(consumer)가 빠를 때
  • subscriber를 별도의 스레드로 분리
public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher<Integer> subOnPub = subscriber -> {
        ExecutorService es = Executors.newSingleThreadExecutor(); //1개 스레드, 몰리면 큐에 넣고 순차처리
        es.execute(()->{
            pub.subscribe(subscriber);
        });
    };

    subOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
EXXXXIT
0    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:1
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:2
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:3
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:4
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:5
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onComplete

아까랑 다르게 main 스레드에서 exit을 먼저 하고 나머지 스레드로 작업한다.

 

publishOn

  • 데이터 생성(publisher)은 빠른데 처리하는 곳(consumer)이 느릴 때 
  • publisher를 별도의 스레드로 분리
  • 데이터 생성은 메인에서 빨리해 근데 가져가는 건 느리니까 별도에서 진행해
public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                log.debug("subscription request");
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher pubOnPub = subscriber -> {
        pub.subscribe(new Flow.Subscriber<Integer>() {  //중개 operator
            ExecutorService es = Executors.newSingleThreadExecutor(); //코어 갯수가 하나라서 큐에 넣어서 안꼬임

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }
            // 별개의 스래드로 아래 처리하고 싶음
            @Override
            public void onNext(Integer item) {
                es.execute(() -> subscriber.onNext(item));
            }

            @Override
            public void onError(Throwable throwable) {
                es.execute(() -> subscriber.onError(throwable));
            }

            @Override
            public void onComplete() {
                es.execute(() -> subscriber.onComplete());
            }
        });
    };

    pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
0    [main] DEBUG toby.live.SchedulerEx  - onSubscribe
1    [main] DEBUG toby.live.SchedulerEx  - subscription request
EXXXXIT
4    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:1
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:2
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:3
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:4
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:5
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onComplete

이 친구는 onSubscribe, exit까지 main 스레드에서 해버리고 나머지를 할당한 별도의 스레드에서 진행한다.

 

publishOn / subscribeOn 개념을 둘 다 사용할 수도 있다.

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                log.debug("subscription request");
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher<Integer> subOnPub = subscriber -> {
        ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){ //1개 스레드, 몰리면 큐에 넣고 순차처리
            @Override
            public String getThreadNamePrefix() {
                return "subOn-";
            }
        }); 
        es.execute(()->{
            pub.subscribe(subscriber);
        });
    };

    Flow.Publisher pubOnPub = subscriber -> {
        subOnPub.subscribe(new Flow.Subscriber<Integer>() {  //중개 operator
            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                @Override
                public String getThreadNamePrefix() {
                    return "pubOn-";
                }
            });

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }
            // 별개의 스래드로 아래 처리하고 싶음
            @Override
            public void onNext(Integer item) {
                es.execute(() -> subscriber.onNext(item));
            }

            @Override
            public void onError(Throwable throwable) {
                es.execute(() -> subscriber.onError(throwable));
            }

            @Override
            public void onComplete() {
                es.execute(() -> subscriber.onComplete());
            }
        });
    };

    pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
EXXXXIT
0    [subOn-1] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [subOn-1] DEBUG toby.live.SchedulerEx  - subscription request
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:1
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:2
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:3
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:4
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:5
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onComplete

각 스레드에 이름을 주어 로그를 확인해봤더니, 메인은 메인대로 진행하였고(exxit) onSubscribe, request 까지는 subOn 스레드에서 진행하였고 나머지는 pubOn 스레드에서 진행함을 알 수 있다.

 

원리는 이렇고, flux로 구현하면 아래와 같다. 상황에 따라 publishOn/subscribeOn을 하나만 쓸 수도, 모두 쓸 수도 있겠다.

public static void main(String[] args) {
    Flux.range(1, 10) //1부터 10개
        .publishOn(Schedulers.newSingle("pub"))
        .log()
        .subscribeOn(Schedulers.newSingle("sub"))
        .subscribe(System.out::println); //onNext
    System.out.println("exit");
}
exit
32   [sub-1] INFO  reactor.Flux.PublishOn.1  - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
33   [sub-1] INFO  reactor.Flux.PublishOn.1  - | request(unbounded)
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(1)
1
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(2)
2
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(3)
3
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(4)
4
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(5)
5
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(6)
6
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(7)
7
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(8)
8
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(9)
9
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(10)
10
35   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onComplete()

 

flux.interval

메인 스레드 종료 후 별도 스레드에서 200ms마다 하나씩 숫자를 받아서 실행. 5초 지나면 종료되는 코드 작성.

public static void main(String[] args) throws InterruptedException {

    //user thread 하나라도 있으면 강종안함
    //daemon thread 만 남으면 강종
    Flux.interval(Duration.ofMillis(200))//주기를 가지고 숫자를 증가; 무한으로
        .take(10) //10개 받으면 끝; 중지시킨다
        .subscribe(s -> log.debug("onNext::{}", s));

    log.debug("Exit");
    TimeUnit.SECONDS.sleep(5); //그래서 유저 스래드를 만들어줘야 실행됨
}
15   [main] DEBUG toby.live.FluxScEx  - Exit
223  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::0
419  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::1
619  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::2
818  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::3
1015 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::4
1219 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::5
1419 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::6
1618 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::7
1817 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::8
2019 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::9

 

이를 reactor stream 없이 구현한다면?

newSingleThreadScheduledExecutor.scheduleAtFixedRate 이란 함수와 cancel을 복합 구현하여 만들면 된다.

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = sub -> {
        sub.onSubscribe(new Flow.Subscription() {
            int no = 0;
            boolean cancelled = false;

            @Override
            public void request(long n) {
                ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();

                exec.scheduleAtFixedRate(() -> {  //interval
                    if(cancelled){
                        exec.shutdown();
                        return;
                    }
                    sub.onNext(no++);
                }, 0, 300, TimeUnit.MILLISECONDS); //죽을 때 까지 계속 저 주기로 날려
            }

            @Override
            public void cancel() {
                log.debug("cancel called");
                cancelled = true;
            }
        });
    };

    Flow.Publisher<Integer> takePub = sub -> {
        pub.subscribe(new Flow.Subscriber<Integer>() {
            int count = 0;
            Flow.Subscription subs;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                sub.onSubscribe(subscription);
                subs = subscription;
            }

            @Override
            public void onNext(Integer item) {
                sub.onNext(item);
                if(++count >= 5){
                    subs.cancel();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                sub.onError(throwable);
            }

            @Override
            public void onComplete() {
                sub.onComplete();
            }
        });
    };

    takePub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
}
0    [main] DEBUG toby.live.IntervalEx  - onSubscribe
8    [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:0
309  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:1
607  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:2
908  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:3
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:4
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx  - cancel called
728x90
반응형