reactive programming과 관련하여 토비의 라이브 10강을 시청하고 요약정리하려고 한다.
1. Iterable vs Observable (duality)
- Iterable(pull) 방식: 사용하고자 하는 사람이 가져다가 쓴다.
- Iterable#next
public static void main(String[] args){
Iterable<Integer> iter = () ->
new Iterator<>() {
int i = 0;
final static int MAX = 10;
public boolean hasNext() {
return i < MAX;
}
public Integer next() {
return ++i;
}
};
for(Integer i : iter){ //for each
System.out.println(i);
}
for(Iterator<Integer> it = iter.iterator(); it.hasNext();){
System.out.println(it.next()); //pull 데이터를 꺼내자
}
}
- Observable(push) 방식: 데이터를 주면 그걸 받아서 처리하는 방법
- Observable#notifyObservers
@SuppressWarnings("deprecation")
static class IntObservable extends Observable implements Runnable{
@Override
public void run() {
for(int i=1; i<=10; i++){
setChanged(); //데이터 바뀌었으니 가져가라고 설정
notifyObservers(i); //push방식으로 옵저버에게 데이터를 주면
}
}
}
public static void main(String[] args){
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
//받아
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob); //리스너 등록
//io.run(); //실행
//옵저버 패턴은 별도의 스레드에서 작업하기 수월
ExecutorService es = Executors.newSingleThreadExecutor(); //따로 할당받은 스레드에서
es.execute(io); //run이 실행됨
System.out.println(Thread.currentThread().getName() + " DONE"); //메인 스레드에서 실행
es.shutdown(); //스레드 종료
}
추가) 위 내용에 대해 이해하기 위해서는 옵저버 패턴(Observer pattern)을 필수로 알아야 한다.
옵저버 패턴?
객체의 상태 변화를 관찰하는 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여 상태 변화가 있을 때마다 메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 디자인 패턴이다.
한 객체의 상태가 바뀌면 그 객체에 의존하는 다른 객체들한테 연락이 가고, 자동으로 내용이 갱신되는 방식으로 일대다(one-to-many) 의존성을 정의한다.
위 내용에 맞게 스스로 관련 내용을 구현할 수 있지만 자바에서 제공하는 Observer 인터페이스와 Observable 클래스를 가지고 구현할 수도 있다.
Observable 클래스를 상속받은 게 "데이터 제공자" Observer 인터페이스를 구현한 게 "데이터 받는 자"이다.
참고로 Observable클래스는 Java9부터 deprecated라서 관련 경고를 없애기 위해 아래 어노테이션을 넣는다.
@SuppressWarnings("deprecation")
옵저버 패턴에서는 다음과 같은 이슈가 있다.
- complete? 마지막이 언제인지 알기 힘들다.
- error? 진행되다 익셉션이 나면? 전파되는 방식이나 에러는 어떻게 처리? fallback? 관련 아이디어가 패턴에 녹아져 있지 않다.
이에 나오게 된 사상이 reactive stream이다.
2. reactive stream(스펙)
reactive stream이란 non-blocking(논 블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙으로 아래와 같은 특징이 있다.
- 잠재적으로 무한한 양의 데이터 처리
- 순서대로 처리
- 데이터를 비동기적으로 전달
- backpressure를 이용한 데이터 흐름 제어
또한 4가지 컴포넌트로 구성되어 있는데 다음과 같다.
- publisher: 데이터 제공자(옵저버 패턴의observable)
- Publisher.subscribe(Subscriber)
- subscriber: 데이터를 받아서 사용하는 쪽(옵저버 패턴의 observer)
- onSubscribe, onNext ...
- subscription
- proessor
참고로 Java9에 추가된 Flow는 reactvie stream 스펙을 채택하여 사용하고 있다. reative stream 스펙에 대한 내용은 여기에서 확인하기로 하고 이제 소스를 살펴보자.
public static void main(String[] args) throws InterruptedException {
Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
ExecutorService es = Executors.newSingleThreadExecutor();
Flow.Publisher p = new Flow.Publisher() {
@Override
public void subscribe(Flow.Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) { //몇 개줄까; 갯수만큼 받고 싶어
es.execute(() -> {
//스레스 한정 위반이라 n을 안에서 수정 못해, i를 별도로 설정
int i = 0;
try{
while(i++ < n){
if(it.hasNext()){
subscriber.onNext(it.next());
}else {
subscriber.onComplete();
break;
}
}
}catch (RuntimeException e){
subscriber.onError(e);
}
});
}
@Override
public void cancel() { //중간에 작업 취소 시킬 수 있음
}
});
}
};
Flow.Subscriber<Integer> s = new Flow.Subscriber<Integer>() {
Flow.Subscription subscription; //잠시 저장하자
@Override
public void onSubscribe(Flow.Subscription subscription) { //구독이 시작되었어
System.out.println(Thread.currentThread().getName() + " > onSubscribe ");
//request
this.subscription = subscription;
this.subscription.request(1); //Long.MAX = 다 내놔
}
@Override
public void onNext(Integer item) { //update; 데이터 주면 받아서 처리해; 끌어와 데이터를
System.out.println(Thread.currentThread().getName() + " > onNext " + item); //받을 준비 되었어
//버퍼사이즈, cpu 등에 따라 별도로 조절가능
this.subscription.request(1);//하나 줘
}
@Override
public void onError(Throwable throwable) { //에러가 나면 에러를 나 줘; 재시도 등 처리가능; 구독 끝
System.out.println(Thread.currentThread().getName() + " > onError " + throwable.getMessage());
}
@Override
public void onComplete() { //다 완료되었어; 구독 끝
System.out.println(Thread.currentThread().getName() + " > onComplete ");
}
};
p.subscribe(s); //s가 p를 리슨
// es.shutdown();
es.awaitTermination(10, TimeUnit.SECONDS);
es.shutdown();
}
소스 각 라인에 주석으로 설명을 달았으며 큰 흐름은 아래 그림을 보고 이해하면 된다.
여기서의 특징은 publisher와 subscriber 사이에 subscription이라는 데이터이자 중계자를 거쳐서 구독과 관련된 설정을 할 수 있다는 점이다.
reactive stream 표준 및 명세: https://www.reactive-streams.org/
reactive stream 설명: https://engineering.linecorp.com/en/blog/reactive-streams-armeria-1/
https://sabarada.tistory.com/98
'개발 > reactive' 카테고리의 다른 글
[reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.03.18 |
---|---|
[reactive] 4-2. spring 비동기를 위한 interfaces/classes (0) | 2022.03.17 |
[reactive] 4-1. java Future/FutureTask/Callable/Runnable (0) | 2022.03.16 |
[reactive] 3. reactive streams - schedulers (0) | 2022.03.03 |
[reactive] 2. reactive streams - operators (0) | 2022.02.25 |