반응형

reactor를 공부하다 보면 Mono/Flux 이후에 많이 보게 되는 게 Sinks이다.. 솔직히 아직 mono와 flux도 완벽하게 이해하지 못했는데, 자꾸 새로운 개념이 튀어나오는 게 두렵긴 하지만 ㅋㅋ 어쨌건 계속 봐야 모래알 같은 지식이 쌓여 해변가가 될 것이라 믿기에, 짧은 지식으로 나마 원문을 파보도록 한다.

java doc에 기재되어 있는 Sinks의 개념이다. mono/flux보다 한 단계 더 내부에서 실제 동작하는 시그널의 구조체 같은 느낌인 듯 한데, 저것만 봐서 완전히 와닿지 않는다.

https://projectreactor.io/docs/core/release/reference/#processors

 

Reactor 3 Reference Guide

10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 1

projectreactor.io

기존에 processor라는 개념이 있는데, publisher이면서 subscriber의 역할을 하는 친구이다. 보통 reactive stream의 중간체(데이터를 구독받았다가 다시 방출하는..)를 가리키는데 reactive stream에서는 publisher로 보통 구현한다(이때 subscriber 인터페이스의 함수를 직접 호출하지 않도록 조심해야 한다).

 In Reactor a sink is a class that allows safe manual triggering of signals.

Sinks는 안전하게 event trigger를 할 수 있게 하는 interface며 processor의 대용으로 사용할 수 있다.

 

 

1. 일반 구독(flux.subscribe)

@Test
@DisplayName("해당 구독행위는 array에 대해서 하였기에 뒤에서 변화가 이루어진 부분에 대해서는 이미 끝난 행위라서 아무런 변화가 없음")
public void flux(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));
    Flux.fromIterable(array).collectList().subscribe( (data)->System.out.println(data)); //구독
    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //데이터 변화
}
[a, b, c, d, e]

구독 이후에 발생한 데이터의 변화는 감지하지 않는다. 데이터가 정해져있고 거기에 구독자를 추가한다.

 

2. subscribe를 계속 감지하게 하려면? Processor 사용

Flux의 Processor에는 FluxProcessor, EmitterProcessor, ReplayProcessor 등 많은 프로세서들이 존재하는데, 그중 EmitterProcessor는 여러 구독자(subscriber)가 사용할 수 있는 구독과 발행이 동시해 일어나는 프로세서이다.

EmitterProcessor: An implementation of a message-passing Processor implementing publish-subscribe with synchronous (thread-stealing and happen-before interactions) drain loops.

또한, 구독 행위가 등록되고 난 이후에 해당 이벤트가 발생하면 구독하는 대상에게 데이터를 동기적으로 전달한다.

 

EmitterProcessor를 사용한 코드

@Test
public void beforeSink(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e"}));

    //프로세서 시작 구간.
    EmitterProcessor<List<String>> data = EmitterProcessor.create();  //발행인
    data.subscribe(t -> System.out.println("1번 : "+t));  //구독자 추가
    FluxSink<List<String>> sink = data.sink();   //배달부
    sink.next(array); //배달 완료

    array.addAll(Arrays.asList(new String[]{"new", "data", "hello"}));  //내용 추가

    data.subscribe(t -> System.out.println("2번 : "+t));  //구독자 추가
    sink.next(array);  //배달

    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //내용 추가
    sink.next(array);  //배달
}
1번 : [a, b, c, d, e]
1번 : [a, b, c, d, e, new, data, hello]
2번 : [a, b, c, d, e, new, data, hello]
1번 : [a, b, c, d, e, new, data, hello, 1, 2, 3]
2번 : [a, b, c, d, e, new, data, hello, 1, 2, 3]

Processor는 기존에 있거나 새롭게 등장한 구독자(subscriber)에게 데이터(old data)를 전달한다. 구독자 중심이랄까. 

발행하는 기관(processor)을 건설하고 구독자를 모집(subscribe)한 뒤에 계속해서 발행(sink.next)하는 형태이다. 구독 이후에 데이터가 변경되어 발행되었다면 추가 구독없이 변경된 데이터를 받아볼 수 있다. 

작업 할 내용이 데이터가 중심이면 1번의 일반 구독형태를 사용하고, 구독자가 중심이면 프로세스를 사용하면 될 것 같다.

 

3. EmitterProcessor 부분이 reactor 3.5에서 deprecated 돼서 Sinks로 바꿔본다.

@Test
@DisplayName("flux: hot 테스트")
public void sinkTest_multicast(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e"})); //내용

    Sinks.Many<List<String>> sink = Sinks.many().multicast().directBestEffort(); //발행인
    sink.asFlux().subscribe(data -> System.out.println("1번 : " + data)); //구독자 추가
    sink.tryEmitNext(array); //발행함
    sink.asFlux().subscribe(data -> System.out.println("2번 : " + data)); //구독자 추가
    array.addAll(Arrays.asList(new String[]{"1", "2", "3", "4", "5"})); //내용 추가
    sink.tryEmitNext(array); //발행함
    sink.asFlux().subscribe(data -> System.out.println("3번 : " + data)); //구독자 추가
}
1번 : [a, b, c, d, e]
1번 : [a, b, c, d, e, 1, 2, 3, 4, 5]
2번 : [a, b, c, d, e, 1, 2, 3, 4, 5]

1번은 한번 구독으로 변경된 데이터도 다시 받았고, 2번은 구독 이후 데이터를 받았다. 3번은 구독(subscribe) 이후에 아무 데이터 변경이 없어서 로그에 남지 않는다.

 


 

아래 예제를 따라하면 Sink의 multicast/unicast의 차이, sinks.one/sinks.many를 공부할 수 있다. hot/cold publisher에 대한 개념은 덤!

https://prateek-ashtikar512.medium.com/projectreactor-sinks-bac6c88e5e69

 

ProjectReactor — Sinks

public static <T> Sinks.One<T> one() A Sinks.One that works like a conceptual promise: it can be completed with or without a value at any…

prateek-ashtikar512.medium.com

 

@Test
@DisplayName("n subscribers :: 1 message")
public void sinkOne(){
    Sinks.One<Object> sink = Sinks.one(); //n subscribers :: 1 message
    Mono<Object> mono = sink.asMono();

//        mono.subscribe(d -> System.out.println("Sam: " + d));
    mono.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE); //많이 받겠다고 해고 sinks.one 자체가 1개만 방출해서 하나만 받음
            System.out.println(">>>> onSubscribed " + s); // 이거 호출되고..
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });
    mono.subscribe(d -> System.out.println("Sam: " + d));

    sink.tryEmitValue("Hollo");
    sink.tryEmitValue("hi~"); //안받음! one이라서.. 하나만받음
}

@Test
@DisplayName("1 subscriber :: n message")
public void unicast(){
    Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

//        flux.subscribe(d-> System.out.println("Date : "+ d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
            System.out.println(">>>> onSubscribed " + s); // 이거 호출되고..
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });
    flux.subscribe(d-> System.out.println("Date : "+ d));

    sink.tryEmitNext("hi");
    sink.tryEmitNext("i am hungry");
    sink.tryEmitNext("bye");
}

@Test
@DisplayName("sink.many는 갑자기 많은걸 방출하면 FAIL_NON_SERIALIZED 에러가 나서 이때는 재시도를 해줘야 데이터 누수가 없음")
public void unicast2() throws InterruptedException {
    Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

    List<Object> list = new ArrayList<>();
    flux.subscribe(e -> list.add(e));

    for(int i = 0; i< 1000; i++){
        final int j = i;
        CompletableFuture.runAsync(() -> {
            sink.emitNext(j, new Sinks.EmitFailureHandler() {
                @Override
                public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
                    System.out.println(emitResult.toString());
                    //return true; // true if the operation should be retried, false otherwise.
                    // Sinks.many() factory methods that fail with EmitResult.FAIL_NON_SERIALIZED when multiple producers emit concurrently
                    //그래서 FAIL_NON_SERIALIZED 에러일 때는 재시도하도록 해줘야 함
                    return emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
                }
            });
        });
    }

    Thread.sleep(5_000);
    System.out.println(list.size());
}

@Test
@DisplayName("n subscribers :: n message / 구독 이후 발행한 메세지부터 받음 / hot")
public void multicast(){
    Sinks.Many<Object> sink = Sinks.many().multicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

    sink.tryEmitNext("hi");
    sink.tryEmitNext("hello");

    flux.subscribe(d -> System.out.println("SAM: " + d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(5); //총 가져올 갯수 1이면 sink.one과 같은 효과
            System.out.println(">>>> onSubscribed "); // 이거 호출되고 이제부터 데이터 받음
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o); //실 데이터
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });

    sink.tryEmitNext("????");
    flux.subscribe(d-> System.out.println("here: " +d));
    sink.tryEmitNext(" new");
}

@Test
@DisplayName("n subscribers :: n message / 구독 이전 발행 한 메세지도 받음 / cold")
public void manyReplay(){
    Sinks.Many<Object> sink = Sinks.many().replay().all(); //onSubscribe 하고 바로 첨부터하고 그담부터는 리스닝
    Flux<Object> flux = sink.asFlux();

    sink.tryEmitNext("hi");
    sink.tryEmitNext("how are you");

    flux.subscribe(d -> System.out.println("First: " + d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE); //총 가져올 갯수 1이면 sink.one과 같은 효과
            System.out.println(">>>> onSubscribed "); // 이거 호출되고 이제부터 데이터 받음
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o); //실 데이터
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });

    sink.tryEmitNext("???");
    flux.subscribe(d -> System.out.println("Demon: " +d));
    sink.tryEmitNext("new");
}
728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[webclient] 비슷한데 뭘 써야할지 모르겠는 것들  (0) 2022.04.01
[webflux] block vs toFuture  (0) 2022.03.31
[webflux] 실무투입고민  (0) 2022.03.30
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 10. Flux  (0) 2022.03.25

+ Recent posts