개발/reactive

[reactive] 2. reactive streams - operators

방푸린 2022. 2. 25. 16:59
반응형

오늘은 두 번째 강의이다.

 

operator라고 publisher와 subscriber사이에서 데이터 변형을 도와주는 것을 만들어본다.

publisher -> [data1] -> operator1 -> [data2] -> oper2 -> [data3] -> subscriber

이렇게 operator1은 publisher에게는 subscriber, oper2에게는 publisher의 역할을 하게 된다. 

 

1. map (1-1 mapping) 작성

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    Flow.Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10); //d2로 만든다
    Flow.Publisher<Integer> map2Pub = mapPub(mapPub, (Function<Integer, Integer>) s -> -s); //d3로 만든다
    map2Pub.subscribe(logSub());
}

private static Flow.Publisher<Integer> mapPub(Flow.Publisher<Integer> pub, Function<Integer, Integer> f) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) { //logSub
            pub.subscribe(new DelegateSub(subscriber) {
                @Override
                public void onNext(Integer item) {
                    subscriber.onNext(f.apply(item));
                }
            });
        }
    };
}

private static Flow.Subscriber<Integer> logSub() {
    return new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(Integer item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

private static Flow.Publisher<Integer> iterPub(List<Integer> iter) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            //반드시 호출해줘야 함, subscription = 구독이 담고 있는 액션
            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //갯수는 신경쓰지 말자
                    try{
                        iter.forEach(s -> subscriber.onNext(s));
                        subscriber.onComplete(); //다 보내면 완료를 알려줘야함
                    }catch (Throwable t){
                        subscriber.onError(t);
                    }
                }

                @Override
                public void cancel() {

                }
            });
        }
    };
}

위 코드에서 흐름은 아래와 같다. pub -> logSub로 데이터가 흘러가는걸 downstream이라고 하고 반대 방향으로 올라오는 것을 upstream이라고 부른다.

pub -> [data1] -> mapPub -> [data2] -> logSub
                        <- subscribe(logSub)
                        -> onSubscribe(s)
                        -> onNext
                        -> onNext
                        -> onComplete

일대일 매핑 방식이라 (1-> 10; 2-> 20; ...) mapSub의 onNext에서 매번 logSub의 onNext를 불러 데이터를 전송하는 것을 알 수 있다.

 

2. reduce 작성

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    //Flow.Publisher<Integer> sumPub = sumPub(pub); //합계
    //sumPub.subscribe(logSub());

    Flow.Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer, Integer, Integer>)(a, b) -> a+b ); //합계
    reducePub.subscribe(logSub());
}

//1, 2, 3, 4, 5
// 0 -> (0, 1) -> 1
// 1 -> (1, 2) -> 3
// 3 -> (3, 3) -> 6
// ...
private static Flow.Publisher<Integer> reducePub(Flow.Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            pub.subscribe(new DelegateSub(subscriber){
                int result = init;
                @Override
                public void onNext(Integer item) {
                    result = bf.apply(result, item);
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(result);
                    subscriber.onComplete();
                }
            });
        }
    };
}

private static Flow.Publisher<Integer> sumPub(Flow.Publisher<Integer> pub) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            pub.subscribe(new DelegateSub(subscriber){
                int sum = 0;
                @Override
                public void onNext(Integer item) {
                    sum += item;
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(sum);
                    subscriber.onComplete();
                }
            });
        }
    };
}

private static Flow.Subscriber<Integer> logSub() {
    return new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(Integer item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

private static Flow.Publisher<Integer> iterPub(List<Integer> iter) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            //반드시 호출해줘야 함, subscription = 구독이 담고 있는 액션
            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //갯수는 신경쓰지 말자
                    try{
                        iter.forEach(s -> subscriber.onNext(s));
                        subscriber.onComplete(); //다 보내면 완료를 알려줘야함
                    }catch (Throwable t){
                        subscriber.onError(t);
                    }
                }

                @Override
                public void cancel() {

                }
            });
        }
    };
}

합계와 같이 n개의 데이터가 하나의 데이터로 흡수되어 내려갈 때에는 onNext에서는 필요한 계산을 하고 onComplete에서 부모의 onNext를 불러 데이터를 한 번만 전송해주고, onComplete를 불러 더 이상 데이터가 없음을 알려준다.

 

3. generic으로 변환

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    Flow.Publisher<String> mapPub = mapPub(pub, (Function<Integer, String>) s -> "[" +s+ "]"); //d2로 만든다
    mapPub.subscribe(logSub());

    //Flow.Publisher<String> reducePub = reducePub2(pub, "", (BiFunction<String, Integer, String>)(a, b) -> a + "-" + b );
    Flow.Publisher<StringBuilder> reducePub = reducePub2(pub, new StringBuilder(), (a, b) -> a .append(b+","));
    reducePub.subscribe(logSub());

}

//generic으로 전환
//publisher가 발행한 T(int) -> R(string)으로 전환해주는 매개체(pub이자 sub)
private static <T, R> Flow.Publisher<R> mapPub(Flow.Publisher<T> pub, Function<T, R> f) {
    return new Flow.Publisher<R>() {
        @Override
        public void subscribe(Flow.Subscriber<? super R> subscriber) { //logSub
            pub.subscribe(new DelegateSub<T, R>(subscriber) {
                @Override
                public void onNext(T item) {
                    subscriber.onNext(f.apply(item));
                }
            });
        }
    };
}

private static <T, R> Flow.Publisher<R> reducePub2(Flow.Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
    return new Flow.Publisher<R>() {
        @Override
        public void subscribe(Flow.Subscriber<? super R> subscriber) {
            pub.subscribe(new DelegateSub<T, R>(subscriber){
                R result = init;

                @Override
                public void onNext(T item) {
                    result = bf.apply(result, item);
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(result);
                    subscriber.onComplete();
                }
            });
        }
    };
}

//publisher가 발행한 숫자를 변환한 최종 값(string)을 찍기만 하는 subscriber
private static <T> Flow.Subscriber<T> logSub() {
    return new Flow.Subscriber<T>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(T item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

 

4. reactor 라이브러리 사용

위에서 손으로 짠 코드는 사실 reactor lib를 사용하면 손쉽게 작성할 수 있다.

implementation 'io.projectreactor:reactor-core:3.4.15'

위 dependency를 추가하고 아래와 같이 진행한다.

 

public static void main(String[] args){
    Flux.create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .subscribe(System.out::println);
}
//로그
0    [main] DEBUG reactor.util.Loggers  - Using Slf4j logging framework
15   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
16   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
18   [main] INFO  reactor.Flux.Create.1  - onNext(1)
1    //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onNext(2)
2    //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onNext(3)
3    //sysout 결과
19   [main] INFO  reactor.Flux.Create.1  - onComplete()

로그(log())를 같이 살펴보면 

  1. flux의 .subscribe를 만나면 타고타고 올라가서 onSubscribe를 먼저 실행하고
  2. request(Long.MAX) -> unbounded로 표현되어 있는 것이고
  3. onNext -> -> onComplete를 실행하는 것을 알 수 있다.

 

<map>

public static void main(String[] args){
    Flux.<Integer>create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .map(s -> s*10)
    .log()
    .subscribe(System.out::println);
}
14   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
15   [main] INFO  reactor.Flux.Map.2  - onSubscribe(FluxMap.MapSubscriber)
15   [main] INFO  reactor.Flux.Map.2  - request(unbounded)
15   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
17   [main] INFO  reactor.Flux.Create.1  - onNext(1)
17   [main] INFO  reactor.Flux.Map.2  - onNext(10)
10   // sysout 결과
17   [main] INFO  reactor.Flux.Create.1  - onNext(2)
17   [main] INFO  reactor.Flux.Map.2  - onNext(20)
20   //sysout 결과
17   [main] INFO  reactor.Flux.Create.1  - onNext(3)
17   [main] INFO  reactor.Flux.Map.2  - onNext(30)
30   //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onComplete()
18   [main] INFO  reactor.Flux.Map.2  - onComplete()

로그를 위아래 걸어줬더니 위와 같은 로그가 찍혔다. Flux.Create. 로그는 위 log(), Flux.Map. 로그는 아래 log()라고 보면 된다. 위에서는 1이 넘어갔지만 map operator를 건너서 10이 되어 넘어가는 것을 볼 수 있다.

 

<reduce>

public static void main(String[] args){
    Flux.<Integer>create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .map(s -> s*10)
    .log()
    .reduce(0, (a, b) -> a+b)
    .log()
    .subscribe(System.out::println);
}
45   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
46   [main] INFO  reactor.Flux.Map.2  - onSubscribe(FluxMap.MapSubscriber)
46   [main] INFO  reactor.Mono.ReduceSeed.3  - | onSubscribe([Fuseable] MonoReduceSeed.ReduceSeedSubscriber)
46   [main] INFO  reactor.Mono.ReduceSeed.3  - | request(unbounded)
46   [main] INFO  reactor.Flux.Map.2  - request(unbounded)
47   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
48   [main] INFO  reactor.Flux.Create.1  - onNext(1)
48   [main] INFO  reactor.Flux.Map.2  - onNext(10)
48   [main] INFO  reactor.Flux.Create.1  - onNext(2)
48   [main] INFO  reactor.Flux.Map.2  - onNext(20)
48   [main] INFO  reactor.Flux.Create.1  - onNext(3)
49   [main] INFO  reactor.Flux.Map.2  - onNext(30)
49   [main] INFO  reactor.Flux.Create.1  - onComplete()
49   [main] INFO  reactor.Flux.Map.2  - onComplete()
49   [main] INFO  reactor.Mono.ReduceSeed.3  - | onNext(60)
60   //sysout 결과
49   [main] INFO  reactor.Mono.ReduceSeed.3  - | onComplete()

reduce operator를 마지막에 추가하고 보면 매 숫자를 받아서(create) 10을 곱하고(map) onNext를 계속하다가 마지막에 오면(onComplete) reduce를 실행해서 최종적으로 60을 받는 것을 볼 수 있다.


위 1~3 과정을 거치고 reactor lib를 사용한 후 로그를 보니 왜 그렇게 로그가 지나가는지 이해하기가 쉬웠다. 왜 굳이 예전 방식으로 코딩을 하시는걸까 궁금했는데 기본적인 원리를 손으로 짜보고 나니까 흐름을 파악하는데 큰 도움이 되는 것 같다!

728x90
반응형