오늘은 두 번째 강의이다.
operator라고 publisher와 subscriber사이에서 데이터 변형을 도와주는 것을 만들어본다.
publisher -> [data1] -> operator1 -> [data2] -> oper2 -> [data3] -> subscriber
이렇게 operator1은 publisher에게는 subscriber, oper2에게는 publisher의 역할을 하게 된다.
1. map (1-1 mapping) 작성
public static void main(String[] args){
List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
.limit(10) //제한을 줘야함
.collect(Collectors.toList());
Flow.Publisher<Integer> pub = iterPub(list);
Flow.Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10); //d2로 만든다
Flow.Publisher<Integer> map2Pub = mapPub(mapPub, (Function<Integer, Integer>) s -> -s); //d3로 만든다
map2Pub.subscribe(logSub());
}
private static Flow.Publisher<Integer> mapPub(Flow.Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Flow.Publisher<Integer>() {
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) { //logSub
pub.subscribe(new DelegateSub(subscriber) {
@Override
public void onNext(Integer item) {
subscriber.onNext(f.apply(item));
}
});
}
};
}
private static Flow.Subscriber<Integer> logSub() {
return new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe!");
subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
//subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
}
@Override
public void onNext(Integer item) { //정상 데이터
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() { //완료 시그널
log.debug("onComplete!");
}
};
}
private static Flow.Publisher<Integer> iterPub(List<Integer> iter) {
return new Flow.Publisher<Integer>() {
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
//반드시 호출해줘야 함, subscription = 구독이 담고 있는 액션
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) { //갯수는 신경쓰지 말자
try{
iter.forEach(s -> subscriber.onNext(s));
subscriber.onComplete(); //다 보내면 완료를 알려줘야함
}catch (Throwable t){
subscriber.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
위 코드에서 흐름은 아래와 같다. pub -> logSub로 데이터가 흘러가는걸 downstream이라고 하고 반대 방향으로 올라오는 것을 upstream이라고 부른다.
pub -> [data1] -> mapPub -> [data2] -> logSub
<- subscribe(logSub)
-> onSubscribe(s)
-> onNext
-> onNext
-> onComplete
일대일 매핑 방식이라 (1-> 10; 2-> 20; ...) mapSub의 onNext에서 매번 logSub의 onNext를 불러 데이터를 전송하는 것을 알 수 있다.
2. reduce 작성
public static void main(String[] args){
List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
.limit(10) //제한을 줘야함
.collect(Collectors.toList());
Flow.Publisher<Integer> pub = iterPub(list);
//Flow.Publisher<Integer> sumPub = sumPub(pub); //합계
//sumPub.subscribe(logSub());
Flow.Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer, Integer, Integer>)(a, b) -> a+b ); //합계
reducePub.subscribe(logSub());
}
//1, 2, 3, 4, 5
// 0 -> (0, 1) -> 1
// 1 -> (1, 2) -> 3
// 3 -> (3, 3) -> 6
// ...
private static Flow.Publisher<Integer> reducePub(Flow.Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
return new Flow.Publisher<Integer>() {
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
pub.subscribe(new DelegateSub(subscriber){
int result = init;
@Override
public void onNext(Integer item) {
result = bf.apply(result, item);
}
@Override
public void onComplete() {
subscriber.onNext(result);
subscriber.onComplete();
}
});
}
};
}
private static Flow.Publisher<Integer> sumPub(Flow.Publisher<Integer> pub) {
return new Flow.Publisher<Integer>() {
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
pub.subscribe(new DelegateSub(subscriber){
int sum = 0;
@Override
public void onNext(Integer item) {
sum += item;
}
@Override
public void onComplete() {
subscriber.onNext(sum);
subscriber.onComplete();
}
});
}
};
}
private static Flow.Subscriber<Integer> logSub() {
return new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe!");
subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
//subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
}
@Override
public void onNext(Integer item) { //정상 데이터
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() { //완료 시그널
log.debug("onComplete!");
}
};
}
private static Flow.Publisher<Integer> iterPub(List<Integer> iter) {
return new Flow.Publisher<Integer>() {
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
//반드시 호출해줘야 함, subscription = 구독이 담고 있는 액션
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) { //갯수는 신경쓰지 말자
try{
iter.forEach(s -> subscriber.onNext(s));
subscriber.onComplete(); //다 보내면 완료를 알려줘야함
}catch (Throwable t){
subscriber.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
합계와 같이 n개의 데이터가 하나의 데이터로 흡수되어 내려갈 때에는 onNext에서는 필요한 계산을 하고 onComplete에서 부모의 onNext를 불러 데이터를 한 번만 전송해주고, onComplete를 불러 더 이상 데이터가 없음을 알려준다.
3. generic으로 변환
public static void main(String[] args){
List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
.limit(10) //제한을 줘야함
.collect(Collectors.toList());
Flow.Publisher<Integer> pub = iterPub(list);
Flow.Publisher<String> mapPub = mapPub(pub, (Function<Integer, String>) s -> "[" +s+ "]"); //d2로 만든다
mapPub.subscribe(logSub());
//Flow.Publisher<String> reducePub = reducePub2(pub, "", (BiFunction<String, Integer, String>)(a, b) -> a + "-" + b );
Flow.Publisher<StringBuilder> reducePub = reducePub2(pub, new StringBuilder(), (a, b) -> a .append(b+","));
reducePub.subscribe(logSub());
}
//generic으로 전환
//publisher가 발행한 T(int) -> R(string)으로 전환해주는 매개체(pub이자 sub)
private static <T, R> Flow.Publisher<R> mapPub(Flow.Publisher<T> pub, Function<T, R> f) {
return new Flow.Publisher<R>() {
@Override
public void subscribe(Flow.Subscriber<? super R> subscriber) { //logSub
pub.subscribe(new DelegateSub<T, R>(subscriber) {
@Override
public void onNext(T item) {
subscriber.onNext(f.apply(item));
}
});
}
};
}
private static <T, R> Flow.Publisher<R> reducePub2(Flow.Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
return new Flow.Publisher<R>() {
@Override
public void subscribe(Flow.Subscriber<? super R> subscriber) {
pub.subscribe(new DelegateSub<T, R>(subscriber){
R result = init;
@Override
public void onNext(T item) {
result = bf.apply(result, item);
}
@Override
public void onComplete() {
subscriber.onNext(result);
subscriber.onComplete();
}
});
}
};
}
//publisher가 발행한 숫자를 변환한 최종 값(string)을 찍기만 하는 subscriber
private static <T> Flow.Subscriber<T> logSub() {
return new Flow.Subscriber<T>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.debug("onSubscribe!");
subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
//subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
}
@Override
public void onNext(T item) { //정상 데이터
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() { //완료 시그널
log.debug("onComplete!");
}
};
}
4. reactor 라이브러리 사용
위에서 손으로 짠 코드는 사실 reactor lib를 사용하면 손쉽게 작성할 수 있다.
implementation 'io.projectreactor:reactor-core:3.4.15'
위 dependency를 추가하고 아래와 같이 진행한다.
public static void main(String[] args){
Flux.create(e -> {
e.next(1);
e.next(2);
e.next(3);
e.complete();
})
.log()
.subscribe(System.out::println);
}
//로그
0 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
16 [main] INFO reactor.Flux.Create.1 - request(unbounded)
18 [main] INFO reactor.Flux.Create.1 - onNext(1)
1 //sysout 결과
18 [main] INFO reactor.Flux.Create.1 - onNext(2)
2 //sysout 결과
18 [main] INFO reactor.Flux.Create.1 - onNext(3)
3 //sysout 결과
19 [main] INFO reactor.Flux.Create.1 - onComplete()
로그(log())를 같이 살펴보면
- flux의 .subscribe를 만나면 타고타고 올라가서 onSubscribe를 먼저 실행하고
- request(Long.MAX) -> unbounded로 표현되어 있는 것이고
- onNext -> -> onComplete를 실행하는 것을 알 수 있다.
<map>
public static void main(String[] args){
Flux.<Integer>create(e -> {
e.next(1);
e.next(2);
e.next(3);
e.complete();
})
.log()
.map(s -> s*10)
.log()
.subscribe(System.out::println);
}
14 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
15 [main] INFO reactor.Flux.Map.2 - onSubscribe(FluxMap.MapSubscriber)
15 [main] INFO reactor.Flux.Map.2 - request(unbounded)
15 [main] INFO reactor.Flux.Create.1 - request(unbounded)
17 [main] INFO reactor.Flux.Create.1 - onNext(1)
17 [main] INFO reactor.Flux.Map.2 - onNext(10)
10 // sysout 결과
17 [main] INFO reactor.Flux.Create.1 - onNext(2)
17 [main] INFO reactor.Flux.Map.2 - onNext(20)
20 //sysout 결과
17 [main] INFO reactor.Flux.Create.1 - onNext(3)
17 [main] INFO reactor.Flux.Map.2 - onNext(30)
30 //sysout 결과
18 [main] INFO reactor.Flux.Create.1 - onComplete()
18 [main] INFO reactor.Flux.Map.2 - onComplete()
로그를 위아래 걸어줬더니 위와 같은 로그가 찍혔다. Flux.Create. 로그는 위 log(), Flux.Map. 로그는 아래 log()라고 보면 된다. 위에서는 1이 넘어갔지만 map operator를 건너서 10이 되어 넘어가는 것을 볼 수 있다.
<reduce>
public static void main(String[] args){
Flux.<Integer>create(e -> {
e.next(1);
e.next(2);
e.next(3);
e.complete();
})
.log()
.map(s -> s*10)
.log()
.reduce(0, (a, b) -> a+b)
.log()
.subscribe(System.out::println);
}
45 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
46 [main] INFO reactor.Flux.Map.2 - onSubscribe(FluxMap.MapSubscriber)
46 [main] INFO reactor.Mono.ReduceSeed.3 - | onSubscribe([Fuseable] MonoReduceSeed.ReduceSeedSubscriber)
46 [main] INFO reactor.Mono.ReduceSeed.3 - | request(unbounded)
46 [main] INFO reactor.Flux.Map.2 - request(unbounded)
47 [main] INFO reactor.Flux.Create.1 - request(unbounded)
48 [main] INFO reactor.Flux.Create.1 - onNext(1)
48 [main] INFO reactor.Flux.Map.2 - onNext(10)
48 [main] INFO reactor.Flux.Create.1 - onNext(2)
48 [main] INFO reactor.Flux.Map.2 - onNext(20)
48 [main] INFO reactor.Flux.Create.1 - onNext(3)
49 [main] INFO reactor.Flux.Map.2 - onNext(30)
49 [main] INFO reactor.Flux.Create.1 - onComplete()
49 [main] INFO reactor.Flux.Map.2 - onComplete()
49 [main] INFO reactor.Mono.ReduceSeed.3 - | onNext(60)
60 //sysout 결과
49 [main] INFO reactor.Mono.ReduceSeed.3 - | onComplete()
reduce operator를 마지막에 추가하고 보면 매 숫자를 받아서(create) 10을 곱하고(map) onNext를 계속하다가 마지막에 오면(onComplete) reduce를 실행해서 최종적으로 60을 받는 것을 볼 수 있다.
위 1~3 과정을 거치고 reactor lib를 사용한 후 로그를 보니 왜 그렇게 로그가 지나가는지 이해하기가 쉬웠다. 왜 굳이 예전 방식으로 코딩을 하시는걸까 궁금했는데 기본적인 원리를 손으로 짜보고 나니까 흐름을 파악하는데 큰 도움이 되는 것 같다!
'개발 > reactive' 카테고리의 다른 글
[reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.03.18 |
---|---|
[reactive] 4-2. spring 비동기를 위한 interfaces/classes (0) | 2022.03.17 |
[reactive] 4-1. java Future/FutureTask/Callable/Runnable (0) | 2022.03.16 |
[reactive] 3. reactive streams - schedulers (0) | 2022.03.03 |
[reactive] 1. reactive-streams (0) | 2022.02.25 |