반응형

reactive programming과 관련하여 토비의 라이브 10강을 시청하고 요약정리하려고 한다.

 

1. Iterable vs Observable (duality)

  • Iterable(pull) 방식:   사용하고자 하는 사람이 가져다가 쓴다.
  • Iterable#next
public static void main(String[] args){
    Iterable<Integer> iter = () ->
            new Iterator<>() {
                int i = 0;
                final static int MAX = 10;

                public boolean hasNext() {
                    return i < MAX;
                }

                public Integer next() {
                    return ++i;
                }
            };

    for(Integer i : iter){ //for each
        System.out.println(i);
    }

    for(Iterator<Integer> it = iter.iterator(); it.hasNext();){
        System.out.println(it.next());  //pull 데이터를 꺼내자
    }
}
  • Observable(push) 방식: 데이터를 주면 그걸 받아서 처리하는 방법
  • Observable#notifyObservers
@SuppressWarnings("deprecation")
static class IntObservable extends Observable implements Runnable{
    @Override
    public void run() {
        for(int i=1; i<=10; i++){
            setChanged(); //데이터 바뀌었으니 가져가라고 설정
            notifyObservers(i); //push방식으로 옵저버에게 데이터를 주면
        }
    }
}

public static void main(String[] args){
    Observer ob = new Observer() {
        @Override
        public void update(Observable o, Object arg) {
            //받아
            System.out.println(Thread.currentThread().getName() + " " + arg);
        }
    };
    IntObservable io = new IntObservable();
    io.addObserver(ob); //리스너 등록

    //io.run(); //실행

    //옵저버 패턴은 별도의 스레드에서 작업하기 수월
    ExecutorService es = Executors.newSingleThreadExecutor(); //따로 할당받은 스레드에서
    es.execute(io); //run이 실행됨
    System.out.println(Thread.currentThread().getName() + " DONE"); //메인 스레드에서 실행
    es.shutdown(); //스레드 종료
}

추가) 위 내용에 대해 이해하기 위해서는 옵저버 패턴(Observer pattern)을 필수로 알아야 한다.

옵저버 패턴? 
객체의 상태 변화를 관찰하는 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여 상태 변화가 있을 때마다 메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 디자인 패턴이다.
한 객체의 상태가 바뀌면 그 객체에 의존하는 다른 객체들한테 연락이 가고, 자동으로 내용이 갱신되는 방식으로 일대다(one-to-many) 의존성을 정의한다.

위 내용에 맞게 스스로 관련 내용을 구현할 수 있지만 자바에서 제공하는 Observer 인터페이스와 Observable 클래스를 가지고 구현할 수도 있다.

Observable 클래스를 상속받은 게 "데이터 제공자" Observer 인터페이스를 구현한 게 "데이터 받는 자"이다.

참고로 Observable클래스는 Java9부터 deprecated라서 관련 경고를 없애기 위해 아래 어노테이션을 넣는다.

@SuppressWarnings("deprecation")

 

옵저버 패턴에서는 다음과 같은 이슈가 있다.

  1. complete? 마지막이 언제인지 알기 힘들다.
  2. error? 진행되다 익셉션이 나면? 전파되는 방식이나 에러는 어떻게 처리? fallback?  관련 아이디어가 패턴에 녹아져 있지 않다.

이에 나오게 된 사상이 reactive stream이다.

 

2. reactive stream(스펙)

reactive stream이란 non-blocking(논 블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙으로 아래와 같은 특징이 있다.

  1. 잠재적으로 무한한 양의 데이터 처리
  2. 순서대로 처리
  3. 데이터를 비동기적으로 전달
  4. backpressure를 이용한 데이터 흐름 제어

또한 4가지 컴포넌트로 구성되어 있는데 다음과 같다.

  1. publisher: 데이터 제공자(옵저버 패턴의observable)
    • Publisher.subscribe(Subscriber)
  2. subscriber: 데이터를 받아서 사용하는 쪽(옵저버 패턴의 observer)
    • onSubscribe, onNext ...
  3. subscription
  4. proessor

참고로 Java9에 추가된 Flow는 reactvie stream 스펙을 채택하여 사용하고 있다. reative stream 스펙에 대한 내용은 여기에서 확인하기로 하고 이제 소스를 살펴보자.

public static void main(String[] args) throws InterruptedException {
    Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
    ExecutorService es = Executors.newSingleThreadExecutor();

    Flow.Publisher p = new Flow.Publisher() {
        @Override
        public void subscribe(Flow.Subscriber subscriber) {
            Iterator<Integer> it = itr.iterator();

            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //몇 개줄까; 갯수만큼 받고 싶어
                    es.execute(() -> {
                    //스레스 한정 위반이라 n을 안에서 수정 못해, i를 별도로 설정
                        int i = 0;
                        try{
                            while(i++ < n){
                                if(it.hasNext()){
                                    subscriber.onNext(it.next());
                                }else {
                                    subscriber.onComplete();
                                    break;
                                }
                            }
                        }catch (RuntimeException e){
                            subscriber.onError(e);
                        }
                    });
                }

                @Override
                public void cancel() { //중간에 작업 취소 시킬 수 있음

                }
            });
        }
    };

    Flow.Subscriber<Integer> s = new Flow.Subscriber<Integer>() {
        Flow.Subscription subscription; //잠시 저장하자
        @Override
        public void onSubscribe(Flow.Subscription subscription) { //구독이 시작되었어
            System.out.println(Thread.currentThread().getName() + " > onSubscribe ");
            //request
            this.subscription = subscription;
            this.subscription.request(1); //Long.MAX = 다 내놔
        }

        @Override
        public void onNext(Integer item) { //update; 데이터 주면 받아서 처리해; 끌어와 데이터를
            System.out.println(Thread.currentThread().getName() + " > onNext " + item); //받을 준비 되었어
            //버퍼사이즈, cpu 등에 따라 별도로 조절가능
            this.subscription.request(1);//하나 줘
        }

        @Override
        public void onError(Throwable throwable) { //에러가 나면 에러를 나 줘; 재시도 등 처리가능; 구독 끝
            System.out.println(Thread.currentThread().getName() + " > onError " + throwable.getMessage());
        }

        @Override
        public void onComplete() { //다 완료되었어; 구독 끝
            System.out.println(Thread.currentThread().getName() + " > onComplete ");
        }
    };

    p.subscribe(s); //s가 p를 리슨
   // es.shutdown();
    es.awaitTermination(10, TimeUnit.SECONDS);
    es.shutdown();
}

소스 각 라인에 주석으로 설명을 달았으며 큰 흐름은 아래 그림을 보고 이해하면 된다.

reactive stream from line

여기서의 특징은 publisher와 subscriber 사이에 subscription이라는 데이터이자 중계자를 거쳐서 구독과 관련된 설정을 할 수 있다는 점이다.

 


reactive stream 표준 및 명세: https://www.reactive-streams.org/

 

https://www.reactive-streams.org/

Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols. JDK9 java

www.reactive-streams.org

reactive stream 설명: https://engineering.linecorp.com/en/blog/reactive-streams-armeria-1/

 

Let's Play with Reactive Streams on Armeria - Part 1 - LINE ENGINEERING

What is Reactive Streams? In this post, I'd like to introduce the basic concept of Reactive Streams, and how to use Reactive Streams with Armeria, the

engineering.linecorp.com

https://sabarada.tistory.com/98

 

[Java] Reactive Stream 이란?

reactive stream이란 non-blocking(넌블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙입니다. java의 RxJava, Spring5 Webflux의 Core에 있는 ProjectReactor 프로젝트 모두 해당..

sabarada.tistory.com

 

728x90
반응형

+ Recent posts