개발/reactive
[reactive] 3. reactive streams - schedulers
방푸린
2022. 3. 3. 17:44
반응형
세 번째 강의 시작한다.
public static void main(String[] args) {
Flow.Publisher<Integer> pub = subscriber -> {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onComplete();
}
@Override
public void cancel() {
}
});
};
pub.subscribe(new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe");
subscription.request(Long.MAX_VALUE); //다 보내
}
@Override
public void onNext(Integer item) {
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) {
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("EXXXXIT");
}
0 [main] DEBUG toby.live.SchedulerEx - onSubscribe
2 [main] DEBUG toby.live.SchedulerEx - onNext:1
2 [main] DEBUG toby.live.SchedulerEx - onNext:2
2 [main] DEBUG toby.live.SchedulerEx - onNext:3
2 [main] DEBUG toby.live.SchedulerEx - onNext:4
2 [main] DEBUG toby.live.SchedulerEx - onNext:5
2 [main] DEBUG toby.live.SchedulerEx - onComplete
EXXXXIT
위 코드를 돌리면 아래와 같이 로그가 찍힌다. 보이는 것처럼 main 스레드가 모든 일을 다 하고 있고, 만약 중간에 블로킹이 걸리면 main스레드는 진행을 하지 못하게 된다. pub - sub 사이의 일은 다른 스레드가 하게끔 하고 main스레드는 지나치게 하려면 어떻게 할까
subscribeOn
- 주는자(publisher)가 느리거나 예측 불가할 경우 & 처리(consumer)가 빠를 때
- subscriber를 별도의 스레드로 분리
public static void main(String[] args) {
Flow.Publisher<Integer> pub = subscriber -> {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onComplete();
}
@Override
public void cancel() {
}
});
};
Flow.Publisher<Integer> subOnPub = subscriber -> {
ExecutorService es = Executors.newSingleThreadExecutor(); //1개 스레드, 몰리면 큐에 넣고 순차처리
es.execute(()->{
pub.subscribe(subscriber);
});
};
subOnPub.subscribe(new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe");
subscription.request(Long.MAX_VALUE); //다 보내
}
@Override
public void onNext(Integer item) {
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) {
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("EXXXXIT");
}
EXXXXIT
0 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onSubscribe
2 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:1
2 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:2
2 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:3
2 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:4
2 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:5
2 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onComplete
아까랑 다르게 main 스레드에서 exit을 먼저 하고 나머지 스레드로 작업한다.
publishOn
- 데이터 생성(publisher)은 빠른데 처리하는 곳(consumer)이 느릴 때
- publisher를 별도의 스레드로 분리
- 데이터 생성은 메인에서 빨리해 근데 가져가는 건 느리니까 별도에서 진행해
public static void main(String[] args) {
Flow.Publisher<Integer> pub = subscriber -> {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
log.debug("subscription request");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onComplete();
}
@Override
public void cancel() {
}
});
};
Flow.Publisher pubOnPub = subscriber -> {
pub.subscribe(new Flow.Subscriber<Integer>() { //중개 operator
ExecutorService es = Executors.newSingleThreadExecutor(); //코어 갯수가 하나라서 큐에 넣어서 안꼬임
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(subscription);
}
// 별개의 스래드로 아래 처리하고 싶음
@Override
public void onNext(Integer item) {
es.execute(() -> subscriber.onNext(item));
}
@Override
public void onError(Throwable throwable) {
es.execute(() -> subscriber.onError(throwable));
}
@Override
public void onComplete() {
es.execute(() -> subscriber.onComplete());
}
});
};
pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe");
subscription.request(Long.MAX_VALUE); //다 보내
}
@Override
public void onNext(Integer item) {
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) {
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("EXXXXIT");
}
0 [main] DEBUG toby.live.SchedulerEx - onSubscribe
1 [main] DEBUG toby.live.SchedulerEx - subscription request
EXXXXIT
4 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:1
5 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:2
5 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:3
5 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:4
5 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onNext:5
5 [pool-1-thread-1] DEBUG toby.live.SchedulerEx - onComplete
이 친구는 onSubscribe, exit까지 main 스레드에서 해버리고 나머지를 할당한 별도의 스레드에서 진행한다.
publishOn / subscribeOn 개념을 둘 다 사용할 수도 있다.
public static void main(String[] args) {
Flow.Publisher<Integer> pub = subscriber -> {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
log.debug("subscription request");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onComplete();
}
@Override
public void cancel() {
}
});
};
Flow.Publisher<Integer> subOnPub = subscriber -> {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){ //1개 스레드, 몰리면 큐에 넣고 순차처리
@Override
public String getThreadNamePrefix() {
return "subOn-";
}
});
es.execute(()->{
pub.subscribe(subscriber);
});
};
Flow.Publisher pubOnPub = subscriber -> {
subOnPub.subscribe(new Flow.Subscriber<Integer>() { //중개 operator
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
@Override
public String getThreadNamePrefix() {
return "pubOn-";
}
});
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(subscription);
}
// 별개의 스래드로 아래 처리하고 싶음
@Override
public void onNext(Integer item) {
es.execute(() -> subscriber.onNext(item));
}
@Override
public void onError(Throwable throwable) {
es.execute(() -> subscriber.onError(throwable));
}
@Override
public void onComplete() {
es.execute(() -> subscriber.onComplete());
}
});
};
pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe");
subscription.request(Long.MAX_VALUE); //다 보내
}
@Override
public void onNext(Integer item) {
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) {
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("EXXXXIT");
}
EXXXXIT
0 [subOn-1] DEBUG toby.live.SchedulerEx - onSubscribe
2 [subOn-1] DEBUG toby.live.SchedulerEx - subscription request
4 [pubOn-1] DEBUG toby.live.SchedulerEx - onNext:1
4 [pubOn-1] DEBUG toby.live.SchedulerEx - onNext:2
4 [pubOn-1] DEBUG toby.live.SchedulerEx - onNext:3
4 [pubOn-1] DEBUG toby.live.SchedulerEx - onNext:4
4 [pubOn-1] DEBUG toby.live.SchedulerEx - onNext:5
4 [pubOn-1] DEBUG toby.live.SchedulerEx - onComplete
각 스레드에 이름을 주어 로그를 확인해봤더니, 메인은 메인대로 진행하였고(exxit) onSubscribe, request 까지는 subOn 스레드에서 진행하였고 나머지는 pubOn 스레드에서 진행함을 알 수 있다.
원리는 이렇고, flux로 구현하면 아래와 같다. 상황에 따라 publishOn/subscribeOn을 하나만 쓸 수도, 모두 쓸 수도 있겠다.
public static void main(String[] args) {
Flux.range(1, 10) //1부터 10개
.publishOn(Schedulers.newSingle("pub"))
.log()
.subscribeOn(Schedulers.newSingle("sub"))
.subscribe(System.out::println); //onNext
System.out.println("exit");
}
exit
32 [sub-1] INFO reactor.Flux.PublishOn.1 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
33 [sub-1] INFO reactor.Flux.PublishOn.1 - | request(unbounded)
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(1)
1
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(2)
2
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(3)
3
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(4)
4
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(5)
5
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(6)
6
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(7)
7
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(8)
8
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(9)
9
34 [pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(10)
10
35 [pub-2] INFO reactor.Flux.PublishOn.1 - | onComplete()
flux.interval
메인 스레드 종료 후 별도 스레드에서 200ms마다 하나씩 숫자를 받아서 실행. 5초 지나면 종료되는 코드 작성.
public static void main(String[] args) throws InterruptedException {
//user thread 하나라도 있으면 강종안함
//daemon thread 만 남으면 강종
Flux.interval(Duration.ofMillis(200))//주기를 가지고 숫자를 증가; 무한으로
.take(10) //10개 받으면 끝; 중지시킨다
.subscribe(s -> log.debug("onNext::{}", s));
log.debug("Exit");
TimeUnit.SECONDS.sleep(5); //그래서 유저 스래드를 만들어줘야 실행됨
}
15 [main] DEBUG toby.live.FluxScEx - Exit
223 [parallel-1] DEBUG toby.live.FluxScEx - onNext::0
419 [parallel-1] DEBUG toby.live.FluxScEx - onNext::1
619 [parallel-1] DEBUG toby.live.FluxScEx - onNext::2
818 [parallel-1] DEBUG toby.live.FluxScEx - onNext::3
1015 [parallel-1] DEBUG toby.live.FluxScEx - onNext::4
1219 [parallel-1] DEBUG toby.live.FluxScEx - onNext::5
1419 [parallel-1] DEBUG toby.live.FluxScEx - onNext::6
1618 [parallel-1] DEBUG toby.live.FluxScEx - onNext::7
1817 [parallel-1] DEBUG toby.live.FluxScEx - onNext::8
2019 [parallel-1] DEBUG toby.live.FluxScEx - onNext::9
이를 reactor stream 없이 구현한다면?
newSingleThreadScheduledExecutor.scheduleAtFixedRate 이란 함수와 cancel을 복합 구현하여 만들면 된다.
public static void main(String[] args) {
Flow.Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Flow.Subscription() {
int no = 0;
boolean cancelled = false;
@Override
public void request(long n) {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleAtFixedRate(() -> { //interval
if(cancelled){
exec.shutdown();
return;
}
sub.onNext(no++);
}, 0, 300, TimeUnit.MILLISECONDS); //죽을 때 까지 계속 저 주기로 날려
}
@Override
public void cancel() {
log.debug("cancel called");
cancelled = true;
}
});
};
Flow.Publisher<Integer> takePub = sub -> {
pub.subscribe(new Flow.Subscriber<Integer>() {
int count = 0;
Flow.Subscription subs;
@Override
public void onSubscribe(Flow.Subscription subscription) {
sub.onSubscribe(subscription);
subs = subscription;
}
@Override
public void onNext(Integer item) {
sub.onNext(item);
if(++count >= 5){
subs.cancel();
}
}
@Override
public void onError(Throwable throwable) {
sub.onError(throwable);
}
@Override
public void onComplete() {
sub.onComplete();
}
});
};
takePub.subscribe(new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe");
subscription.request(Long.MAX_VALUE); //다 보내
}
@Override
public void onNext(Integer item) {
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) {
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
}
0 [main] DEBUG toby.live.IntervalEx - onSubscribe
8 [pool-1-thread-1] DEBUG toby.live.IntervalEx - onNext:0
309 [pool-1-thread-1] DEBUG toby.live.IntervalEx - onNext:1
607 [pool-1-thread-1] DEBUG toby.live.IntervalEx - onNext:2
908 [pool-1-thread-1] DEBUG toby.live.IntervalEx - onNext:3
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx - onNext:4
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx - cancel called
728x90
반응형