728x90
반응형
728x90
반응형
반응형

오늘은 두 번째 강의이다.

 

operator라고 publisher와 subscriber사이에서 데이터 변형을 도와주는 것을 만들어본다.

publisher -> [data1] -> operator1 -> [data2] -> oper2 -> [data3] -> subscriber

이렇게 operator1은 publisher에게는 subscriber, oper2에게는 publisher의 역할을 하게 된다. 

 

1. map (1-1 mapping) 작성

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    Flow.Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10); //d2로 만든다
    Flow.Publisher<Integer> map2Pub = mapPub(mapPub, (Function<Integer, Integer>) s -> -s); //d3로 만든다
    map2Pub.subscribe(logSub());
}

private static Flow.Publisher<Integer> mapPub(Flow.Publisher<Integer> pub, Function<Integer, Integer> f) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) { //logSub
            pub.subscribe(new DelegateSub(subscriber) {
                @Override
                public void onNext(Integer item) {
                    subscriber.onNext(f.apply(item));
                }
            });
        }
    };
}

private static Flow.Subscriber<Integer> logSub() {
    return new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(Integer item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

private static Flow.Publisher<Integer> iterPub(List<Integer> iter) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            //반드시 호출해줘야 함, subscription = 구독이 담고 있는 액션
            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //갯수는 신경쓰지 말자
                    try{
                        iter.forEach(s -> subscriber.onNext(s));
                        subscriber.onComplete(); //다 보내면 완료를 알려줘야함
                    }catch (Throwable t){
                        subscriber.onError(t);
                    }
                }

                @Override
                public void cancel() {

                }
            });
        }
    };
}

위 코드에서 흐름은 아래와 같다. pub -> logSub로 데이터가 흘러가는걸 downstream이라고 하고 반대 방향으로 올라오는 것을 upstream이라고 부른다.

pub -> [data1] -> mapPub -> [data2] -> logSub
                        <- subscribe(logSub)
                        -> onSubscribe(s)
                        -> onNext
                        -> onNext
                        -> onComplete

일대일 매핑 방식이라 (1-> 10; 2-> 20; ...) mapSub의 onNext에서 매번 logSub의 onNext를 불러 데이터를 전송하는 것을 알 수 있다.

 

2. reduce 작성

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    //Flow.Publisher<Integer> sumPub = sumPub(pub); //합계
    //sumPub.subscribe(logSub());

    Flow.Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer, Integer, Integer>)(a, b) -> a+b ); //합계
    reducePub.subscribe(logSub());
}

//1, 2, 3, 4, 5
// 0 -> (0, 1) -> 1
// 1 -> (1, 2) -> 3
// 3 -> (3, 3) -> 6
// ...
private static Flow.Publisher<Integer> reducePub(Flow.Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            pub.subscribe(new DelegateSub(subscriber){
                int result = init;
                @Override
                public void onNext(Integer item) {
                    result = bf.apply(result, item);
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(result);
                    subscriber.onComplete();
                }
            });
        }
    };
}

private static Flow.Publisher<Integer> sumPub(Flow.Publisher<Integer> pub) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            pub.subscribe(new DelegateSub(subscriber){
                int sum = 0;
                @Override
                public void onNext(Integer item) {
                    sum += item;
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(sum);
                    subscriber.onComplete();
                }
            });
        }
    };
}

private static Flow.Subscriber<Integer> logSub() {
    return new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(Integer item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

private static Flow.Publisher<Integer> iterPub(List<Integer> iter) {
    return new Flow.Publisher<Integer>() {
        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            //반드시 호출해줘야 함, subscription = 구독이 담고 있는 액션
            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //갯수는 신경쓰지 말자
                    try{
                        iter.forEach(s -> subscriber.onNext(s));
                        subscriber.onComplete(); //다 보내면 완료를 알려줘야함
                    }catch (Throwable t){
                        subscriber.onError(t);
                    }
                }

                @Override
                public void cancel() {

                }
            });
        }
    };
}

합계와 같이 n개의 데이터가 하나의 데이터로 흡수되어 내려갈 때에는 onNext에서는 필요한 계산을 하고 onComplete에서 부모의 onNext를 불러 데이터를 한 번만 전송해주고, onComplete를 불러 더 이상 데이터가 없음을 알려준다.

 

3. generic으로 변환

public static void main(String[] args){
    List<Integer> list = Stream.iterate(1, i -> i+1) //무한으로 계속 만듦
            .limit(10) //제한을 줘야함
            .collect(Collectors.toList());

    Flow.Publisher<Integer> pub = iterPub(list);

    Flow.Publisher<String> mapPub = mapPub(pub, (Function<Integer, String>) s -> "[" +s+ "]"); //d2로 만든다
    mapPub.subscribe(logSub());

    //Flow.Publisher<String> reducePub = reducePub2(pub, "", (BiFunction<String, Integer, String>)(a, b) -> a + "-" + b );
    Flow.Publisher<StringBuilder> reducePub = reducePub2(pub, new StringBuilder(), (a, b) -> a .append(b+","));
    reducePub.subscribe(logSub());

}

//generic으로 전환
//publisher가 발행한 T(int) -> R(string)으로 전환해주는 매개체(pub이자 sub)
private static <T, R> Flow.Publisher<R> mapPub(Flow.Publisher<T> pub, Function<T, R> f) {
    return new Flow.Publisher<R>() {
        @Override
        public void subscribe(Flow.Subscriber<? super R> subscriber) { //logSub
            pub.subscribe(new DelegateSub<T, R>(subscriber) {
                @Override
                public void onNext(T item) {
                    subscriber.onNext(f.apply(item));
                }
            });
        }
    };
}

private static <T, R> Flow.Publisher<R> reducePub2(Flow.Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
    return new Flow.Publisher<R>() {
        @Override
        public void subscribe(Flow.Subscriber<? super R> subscriber) {
            pub.subscribe(new DelegateSub<T, R>(subscriber){
                R result = init;

                @Override
                public void onNext(T item) {
                    result = bf.apply(result, item);
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(result);
                    subscriber.onComplete();
                }
            });
        }
    };
}

//publisher가 발행한 숫자를 변환한 최종 값(string)을 찍기만 하는 subscriber
private static <T> Flow.Subscriber<T> logSub() {
    return new Flow.Subscriber<T>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe!");
            subscription.request(Long.MAX_VALUE); //무제한으로 다 줘
            //subscription.cancel(); //어떠한 이유로 받는 사람이 데이터가 필요 없어질 때
        }

        @Override
        public void onNext(T item) { //정상 데이터
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) { //익셉션을 던지는게 아니라 받아서 처리
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() { //완료 시그널
            log.debug("onComplete!");
        }
    };
}

 

4. reactor 라이브러리 사용

위에서 손으로 짠 코드는 사실 reactor lib를 사용하면 손쉽게 작성할 수 있다.

implementation 'io.projectreactor:reactor-core:3.4.15'

위 dependency를 추가하고 아래와 같이 진행한다.

 

public static void main(String[] args){
    Flux.create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .subscribe(System.out::println);
}
//로그
0    [main] DEBUG reactor.util.Loggers  - Using Slf4j logging framework
15   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
16   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
18   [main] INFO  reactor.Flux.Create.1  - onNext(1)
1    //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onNext(2)
2    //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onNext(3)
3    //sysout 결과
19   [main] INFO  reactor.Flux.Create.1  - onComplete()

로그(log())를 같이 살펴보면 

  1. flux의 .subscribe를 만나면 타고타고 올라가서 onSubscribe를 먼저 실행하고
  2. request(Long.MAX) -> unbounded로 표현되어 있는 것이고
  3. onNext -> -> onComplete를 실행하는 것을 알 수 있다.

 

<map>

public static void main(String[] args){
    Flux.<Integer>create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .map(s -> s*10)
    .log()
    .subscribe(System.out::println);
}
14   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
15   [main] INFO  reactor.Flux.Map.2  - onSubscribe(FluxMap.MapSubscriber)
15   [main] INFO  reactor.Flux.Map.2  - request(unbounded)
15   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
17   [main] INFO  reactor.Flux.Create.1  - onNext(1)
17   [main] INFO  reactor.Flux.Map.2  - onNext(10)
10   // sysout 결과
17   [main] INFO  reactor.Flux.Create.1  - onNext(2)
17   [main] INFO  reactor.Flux.Map.2  - onNext(20)
20   //sysout 결과
17   [main] INFO  reactor.Flux.Create.1  - onNext(3)
17   [main] INFO  reactor.Flux.Map.2  - onNext(30)
30   //sysout 결과
18   [main] INFO  reactor.Flux.Create.1  - onComplete()
18   [main] INFO  reactor.Flux.Map.2  - onComplete()

로그를 위아래 걸어줬더니 위와 같은 로그가 찍혔다. Flux.Create. 로그는 위 log(), Flux.Map. 로그는 아래 log()라고 보면 된다. 위에서는 1이 넘어갔지만 map operator를 건너서 10이 되어 넘어가는 것을 볼 수 있다.

 

<reduce>

public static void main(String[] args){
    Flux.<Integer>create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .map(s -> s*10)
    .log()
    .reduce(0, (a, b) -> a+b)
    .log()
    .subscribe(System.out::println);
}
45   [main] INFO  reactor.Flux.Create.1  - onSubscribe(FluxCreate.BufferAsyncSink)
46   [main] INFO  reactor.Flux.Map.2  - onSubscribe(FluxMap.MapSubscriber)
46   [main] INFO  reactor.Mono.ReduceSeed.3  - | onSubscribe([Fuseable] MonoReduceSeed.ReduceSeedSubscriber)
46   [main] INFO  reactor.Mono.ReduceSeed.3  - | request(unbounded)
46   [main] INFO  reactor.Flux.Map.2  - request(unbounded)
47   [main] INFO  reactor.Flux.Create.1  - request(unbounded)
48   [main] INFO  reactor.Flux.Create.1  - onNext(1)
48   [main] INFO  reactor.Flux.Map.2  - onNext(10)
48   [main] INFO  reactor.Flux.Create.1  - onNext(2)
48   [main] INFO  reactor.Flux.Map.2  - onNext(20)
48   [main] INFO  reactor.Flux.Create.1  - onNext(3)
49   [main] INFO  reactor.Flux.Map.2  - onNext(30)
49   [main] INFO  reactor.Flux.Create.1  - onComplete()
49   [main] INFO  reactor.Flux.Map.2  - onComplete()
49   [main] INFO  reactor.Mono.ReduceSeed.3  - | onNext(60)
60   //sysout 결과
49   [main] INFO  reactor.Mono.ReduceSeed.3  - | onComplete()

reduce operator를 마지막에 추가하고 보면 매 숫자를 받아서(create) 10을 곱하고(map) onNext를 계속하다가 마지막에 오면(onComplete) reduce를 실행해서 최종적으로 60을 받는 것을 볼 수 있다.


위 1~3 과정을 거치고 reactor lib를 사용한 후 로그를 보니 왜 그렇게 로그가 지나가는지 이해하기가 쉬웠다. 왜 굳이 예전 방식으로 코딩을 하시는걸까 궁금했는데 기본적인 원리를 손으로 짜보고 나니까 흐름을 파악하는데 큰 도움이 되는 것 같다!

728x90
반응형
반응형

reactive programming과 관련하여 토비의 라이브 10강을 시청하고 요약정리하려고 한다.

 

1. Iterable vs Observable (duality)

  • Iterable(pull) 방식:   사용하고자 하는 사람이 가져다가 쓴다.
  • Iterable#next
public static void main(String[] args){
    Iterable<Integer> iter = () ->
            new Iterator<>() {
                int i = 0;
                final static int MAX = 10;

                public boolean hasNext() {
                    return i < MAX;
                }

                public Integer next() {
                    return ++i;
                }
            };

    for(Integer i : iter){ //for each
        System.out.println(i);
    }

    for(Iterator<Integer> it = iter.iterator(); it.hasNext();){
        System.out.println(it.next());  //pull 데이터를 꺼내자
    }
}
  • Observable(push) 방식: 데이터를 주면 그걸 받아서 처리하는 방법
  • Observable#notifyObservers
@SuppressWarnings("deprecation")
static class IntObservable extends Observable implements Runnable{
    @Override
    public void run() {
        for(int i=1; i<=10; i++){
            setChanged(); //데이터 바뀌었으니 가져가라고 설정
            notifyObservers(i); //push방식으로 옵저버에게 데이터를 주면
        }
    }
}

public static void main(String[] args){
    Observer ob = new Observer() {
        @Override
        public void update(Observable o, Object arg) {
            //받아
            System.out.println(Thread.currentThread().getName() + " " + arg);
        }
    };
    IntObservable io = new IntObservable();
    io.addObserver(ob); //리스너 등록

    //io.run(); //실행

    //옵저버 패턴은 별도의 스레드에서 작업하기 수월
    ExecutorService es = Executors.newSingleThreadExecutor(); //따로 할당받은 스레드에서
    es.execute(io); //run이 실행됨
    System.out.println(Thread.currentThread().getName() + " DONE"); //메인 스레드에서 실행
    es.shutdown(); //스레드 종료
}

추가) 위 내용에 대해 이해하기 위해서는 옵저버 패턴(Observer pattern)을 필수로 알아야 한다.

옵저버 패턴? 
객체의 상태 변화를 관찰하는 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여 상태 변화가 있을 때마다 메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 디자인 패턴이다.
한 객체의 상태가 바뀌면 그 객체에 의존하는 다른 객체들한테 연락이 가고, 자동으로 내용이 갱신되는 방식으로 일대다(one-to-many) 의존성을 정의한다.

위 내용에 맞게 스스로 관련 내용을 구현할 수 있지만 자바에서 제공하는 Observer 인터페이스와 Observable 클래스를 가지고 구현할 수도 있다.

Observable 클래스를 상속받은 게 "데이터 제공자" Observer 인터페이스를 구현한 게 "데이터 받는 자"이다.

참고로 Observable클래스는 Java9부터 deprecated라서 관련 경고를 없애기 위해 아래 어노테이션을 넣는다.

@SuppressWarnings("deprecation")

 

옵저버 패턴에서는 다음과 같은 이슈가 있다.

  1. complete? 마지막이 언제인지 알기 힘들다.
  2. error? 진행되다 익셉션이 나면? 전파되는 방식이나 에러는 어떻게 처리? fallback?  관련 아이디어가 패턴에 녹아져 있지 않다.

이에 나오게 된 사상이 reactive stream이다.

 

2. reactive stream(스펙)

reactive stream이란 non-blocking(논 블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙으로 아래와 같은 특징이 있다.

  1. 잠재적으로 무한한 양의 데이터 처리
  2. 순서대로 처리
  3. 데이터를 비동기적으로 전달
  4. backpressure를 이용한 데이터 흐름 제어

또한 4가지 컴포넌트로 구성되어 있는데 다음과 같다.

  1. publisher: 데이터 제공자(옵저버 패턴의observable)
    • Publisher.subscribe(Subscriber)
  2. subscriber: 데이터를 받아서 사용하는 쪽(옵저버 패턴의 observer)
    • onSubscribe, onNext ...
  3. subscription
  4. proessor

참고로 Java9에 추가된 Flow는 reactvie stream 스펙을 채택하여 사용하고 있다. reative stream 스펙에 대한 내용은 여기에서 확인하기로 하고 이제 소스를 살펴보자.

public static void main(String[] args) throws InterruptedException {
    Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
    ExecutorService es = Executors.newSingleThreadExecutor();

    Flow.Publisher p = new Flow.Publisher() {
        @Override
        public void subscribe(Flow.Subscriber subscriber) {
            Iterator<Integer> it = itr.iterator();

            subscriber.onSubscribe(new Flow.Subscription() {
                @Override
                public void request(long n) { //몇 개줄까; 갯수만큼 받고 싶어
                    es.execute(() -> {
                    //스레스 한정 위반이라 n을 안에서 수정 못해, i를 별도로 설정
                        int i = 0;
                        try{
                            while(i++ < n){
                                if(it.hasNext()){
                                    subscriber.onNext(it.next());
                                }else {
                                    subscriber.onComplete();
                                    break;
                                }
                            }
                        }catch (RuntimeException e){
                            subscriber.onError(e);
                        }
                    });
                }

                @Override
                public void cancel() { //중간에 작업 취소 시킬 수 있음

                }
            });
        }
    };

    Flow.Subscriber<Integer> s = new Flow.Subscriber<Integer>() {
        Flow.Subscription subscription; //잠시 저장하자
        @Override
        public void onSubscribe(Flow.Subscription subscription) { //구독이 시작되었어
            System.out.println(Thread.currentThread().getName() + " > onSubscribe ");
            //request
            this.subscription = subscription;
            this.subscription.request(1); //Long.MAX = 다 내놔
        }

        @Override
        public void onNext(Integer item) { //update; 데이터 주면 받아서 처리해; 끌어와 데이터를
            System.out.println(Thread.currentThread().getName() + " > onNext " + item); //받을 준비 되었어
            //버퍼사이즈, cpu 등에 따라 별도로 조절가능
            this.subscription.request(1);//하나 줘
        }

        @Override
        public void onError(Throwable throwable) { //에러가 나면 에러를 나 줘; 재시도 등 처리가능; 구독 끝
            System.out.println(Thread.currentThread().getName() + " > onError " + throwable.getMessage());
        }

        @Override
        public void onComplete() { //다 완료되었어; 구독 끝
            System.out.println(Thread.currentThread().getName() + " > onComplete ");
        }
    };

    p.subscribe(s); //s가 p를 리슨
   // es.shutdown();
    es.awaitTermination(10, TimeUnit.SECONDS);
    es.shutdown();
}

소스 각 라인에 주석으로 설명을 달았으며 큰 흐름은 아래 그림을 보고 이해하면 된다.

reactive stream from line

여기서의 특징은 publisher와 subscriber 사이에 subscription이라는 데이터이자 중계자를 거쳐서 구독과 관련된 설정을 할 수 있다는 점이다.

 


reactive stream 표준 및 명세: https://www.reactive-streams.org/

 

https://www.reactive-streams.org/

Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols. JDK9 java

www.reactive-streams.org

reactive stream 설명: https://engineering.linecorp.com/en/blog/reactive-streams-armeria-1/

 

Let's Play with Reactive Streams on Armeria - Part 1 - LINE ENGINEERING

What is Reactive Streams? In this post, I'd like to introduce the basic concept of Reactive Streams, and how to use Reactive Streams with Armeria, the

engineering.linecorp.com

https://sabarada.tistory.com/98

 

[Java] Reactive Stream 이란?

reactive stream이란 non-blocking(넌블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙입니다. java의 RxJava, Spring5 Webflux의 Core에 있는 ProjectReactor 프로젝트 모두 해당..

sabarada.tistory.com

 

728x90
반응형
반응형

JVM (Java Virtual Machine)

  • 자바 가상 머신으로 자바 바이트 코드(.class 파일)를 OS에 특화된 코드로 변환(인터프리터와 JIT 컴파일러)하여 실행
  • 바이트 코드를 실행하는 표준(JVM 자체는 표준)이자 구현체(특정 밴더가 구현한 JVM)
  • 특정 플랫폼에 종속적

JRE (Java Runtime Environment): JVM + 라이브러리

  • 자바 애플리케이션을 실행할 수 있도록 구성된 배포판
  • JVM과 핵심 라이브러리 및 자바 런타임 환경에서 사용하는 프로퍼티 세팅이나 리소스 파일을 가지고 있음
  • 개발 관련 도구(JDK)는 포함하지 않음

JDK (Java Development Kit): JRE + 개발 툴

  • JRE + 개발에 필요할 툴
  • 소스 코드를 작성할 때 사용하는 자바 언어는 플랫폼에 독립적
  • 오라클은 자바 11부터는 JDK만 제공하며 JRE를 따로 제공하지 않음
  • Write Once Run Anywhere

Java

  • 프로그래밍 언어
  • JDK에 들어있는 자바 컴파일러(javac)를 사용하여 바이트코드(.class 파일)로 컴파일
  • 자바 유료화? 오라클에서 만든 Oracle JDK 11 버전부터/ 상용으로 사용할 때 유료이며 나머지는 무료

JVM 구조

jvm

Class Loader

  • .class에 있는 바이트코드를 읽고 메모리에 저장
  • loading: 클래스 읽어오는 과정
  • linking: 레퍼런스를 연결하는 과정
  • initialization: static 값들 초기화 및 변수에 할당

메모리

  • 메소드 영역에는 클래스 수준의 정보(클래스 이름, 부모 클래스 이름, 메소드, 변수) 저장
  • 힙 영역에는 객체를 저장
  • 스택 영역에는 스레드 마다 런타임 스택을 만들고, 그 안에 메소드 호출을 스택 프레임이라 부르는 블록으로 쌓음. 스레드 종료하면 런타임 스택도 삭제됨
  • PC(Program Counter) 레지스터: 스레드 마다 스레드 내 현재 실행할 instruction의 위치를 가리키는 포인터가 생성됨
  • 네이티브 메소드 스택: Native Method를 호출하는 코드를 수행하기 위한 스택

Execution engine

  • interpreter: 바이트 코드를 한 줄씩 실행
  • JIT compiler: 인터프리터 효율을 높이기 위해, 인터프리터가 반복되는 코드를 발견하면 JIT 컴파일러는 반복되는 코드를 모두 네이티브 코드로 바꿈. 그다음부터 인터프리터는 네이티브 코드로 컴파일된 코드를 바로 사용할 수 있도록 지원
  • GC(Garbage Collector): 더이상 참조되지 않는 객체를 모아서 주기적으로 정리하는 프로그램

JNI(Java Native Interface)

  • 자바 애플리케이션에서 C, C++, 어셈블리로 작성된 함수를 사용할 수 있게 함
  • native 키워드를 사용한 메소드 호출

Native Method Library

  • C, C++로 작성된 라이브러리

 

garbage?

  • 주소를 잃어버려서 사용할 수 없는 메모리, 정리되지 않은 메모리, dangling object

garbage collection(GC)?

  • garbage의 메모리 해제; JVM이 한가할 때(idle) 혹은 메모리가 부족해져 OS에게 추가로 메모리를 할당해달라고 요청할 때 실행

 

JVM의 Heap영역의 대전제

  • 대부분의 객체는 대부분 일회성이며 금방 접근 불가능한 상태(Unreachable)가 된다.
  • 오래된 객체에서 새로운 객체로의 참조는 아주 적게 존재한다.
  • 생존 기간에 따라 heap을 두 가지 영역으로 쪼갬(Young, Old 영역) - 초기에는 Perm 영역이 존재하였지만 Java8부터 제거됨

 

minor gc?

  • 새롭게 생성된 객체가 할당(Allocation)되는 Young 영역에 대한 가비지 컬렉션(Garbage Collection)
  • 대부분의 객체가 금방 Unreachable 상태가 되기 때문에, 많은 객체가 Young영역에 생성되었다가 사라짐
    • The Young Generation is further divided into three spaces: Eden space and two Survivor spaces (S0 and S1).
  • eden 영역이 꽉 찬 경우 발생, 빠름
    • eden 영역에서 사용되지 않아진 객체의 메모리 해제
    • eden 영역에서 살아남은 객체는 s1/s2로 이동(s1이 꽉차면 s2, vice versa)
       

major gc/full gc?

  • minor gc속에서 살아남은 객체가 복사되는 Old 영역에 대한 가비지 컬렉션(Garbage Collection)
  • 복사되는 과정에서 대부분 Young 영역보다 크게 할당되며, 크기가 큰 만큼 가비지는 적게 발생함
  • old 영역이 꽉 찬 경우 발생, 느림

 

구체적으로 어떻게?

  • Stop the world(STW): GC를 실행하기 위해 JVM이 어플리케이션의 실행을 멈추는 작업
    • GC를 실행하는 스레드를 제외한 모든 스레드의 작업이 중단되고, GC가 완료되면 작업 재개됨
    • GC옵션이나 알고리즘으로 STW시간을 단축할 수 있음
  • Mark and Sweep: 
    • Mark: 사용되는 메모리와 사용되지 않는 메모리를 식별하는 작업
    • Sweep: Mark 단계에서 사용되지 않음으로 식별된 메모리를 해제하는 작업
    • Stop The World를 통해 모든 작업을 중단시키면, GC는 스택의 모든 변수 또는 Reachable 객체를 스캔하면서 각각이 어떤 객체를 참고하고 있는지를 탐색하며 mark and sweep 작업을 진행함

algorithm

java11 default gc

-XX:+UseG1GC

힙을 격자/작은 지역으로 쪼개서 특정 지역별로 gc가 일어남.

그래서 예측가능하고 정지시간이 짧음,

성능이 좋고 메모리 효율이 좋음,

gc처리가 병렬처리 가능

튜닝 가능

 

java8 default gc

-XX:+UseParallelGC

G1GC가 자바7부터 나왔기 때문에 사용 가능 다만 기본값은 아니라는 점

 

주로 사용되는 GC

  1. G1 garbage collector(G1GC)
    1. java 7+ 사용가능
  2. Z garbage collector(ZGC)
    1. java 11+ 사용가능
  3. Parallel garbage collector
    1. java 5+ 사용가능
728x90
반응형

'개발 > java' 카테고리의 다른 글

[jmh] benchmark test  (0) 2022.08.04
[powermock] mock static method  (0) 2022.07.21
[keyword] transient in serialization  (0) 2022.02.16
[junit5] no test found / checked exception is invalid  (0) 2022.02.07
[junit] test runner  (0) 2022.01.03
반응형

들어가며..

마이크로 서비스 아키텍처(MSA) 프로젝트를 개발 및 운영을 하다 보면 도메인 모델은 복잡해지고 점점 설계 시점의 의도와는 다른 방향으로 변질되는 일이 빈번히 발생한다. 특히 요즘처럼 고차원적인 UX, 급변하는 IT 시장의 흐름으로 인해 시도 때도 없이 달라지는 기획팀/사업부의 요구사항을 충족하는 모델을 만드는 건 더욱 어려운 일이 되었다. 게다가 이렇게 복잡한 내용을 하나의 화면에서 다 보여달라고 하니.. 아무리 인덱스를 추가하고 쿼리를 튜닝하더라도 조회 속도가 나지 않고, n번의 api를 결과를 합치면서 생기는 실수, 더러운 소스코드 등은 결국 서비스의 질을 낮추기에 충분해진다.

(내가 경험한) MSA

이게 왜 어려워졌을까? 데이터의 변경과 조회 시 필요한 데이터가 관점에 따라 명백히 다른데, 이걸 하나의 모델/애플리케이션/디비에서 해결하려다 보니 (각 영역에서 필요하지 않은 속성들로 인해) 복잡도가 증가하고 변질되는 것은 아닐까?

그럼 어떻게 이 문제를 해결 할 수 있을까? 데이터의 변경과 조회를 나누면 되지 않을까? 해서 나온 게 CQRS이다.

 

CQRS란?

마이크로 서비스의 패턴이 무려 44가지나 있다고 하는데, 간단하게 관련 용어 몇 가지만 살펴본다.

  •  DDD - Domain Driven Design 도메인 주도 개발(방법론)
    • 비즈니스를 도메인 별로 나누어 설계하는 방식/접근법
  • EDA - Event Driven Architecture
    • 분산된 시스템 간에 이벤트를 생성, 발행(publishing)하고 발행된 이벤트를 필요로 하는 수신자(subscriber)에게 전송되는 흐름으로 구성된 아키텍처로 이벤트를 수신한 수신자가 이벤트를 처리하는 형태임
    • Architectural Patterns
  • CQRS - Command Query Responsibility Segregation 패턴 
    • "CQRS는 DDD 기반의 Object Model 방법론 적용 시 나타났던 문제점들을 해결하기 위해 등장했다"
  • Aggregate Pattern
  • Saga Pattern
    • MSA 기반의 분산 시스템에서 분산된 DB의 정합성을 보장하는 방법, 트랜젝션 관리주체는 애플리케이션(not DB)
    • sequence diagram을 따라가다(transaction flow 속에서) rollback이 필요할 때 처리하는 방법
    • Choreography-Based Saga(각자 알아서 처리)
    • Orchestration-Based Saga(중앙 컨트롤 타워가 정리)
DDD is well suited with some architectural patterns like CQRS, Event Driven Architecture, Event sourcing, etc but they are not required to use DDD.

 

CQRS를 구현할 수 있게 하는 axon framework

  • axon framework는 DDD 패러다임 하에서 event sourcing과 CQRS 패턴을 이용해 애플리케이션을 작성할 수 있도록 도와주는 framework
  • 작성 글: [cqrs] axon framework란

실습

환경: springboot2.6.2 / axon framework 4.2.1 + axon server / postgresql14

1. axon server를 설치하고

2022.01.12 - [서버 세팅 & tool/vm on mac] - [vm] axon server 설치

 

[vm] axon server 설치

2022.01.03 - [세팅/vm on mac] - [vm] virtual box centos7 세팅 [vm] virtual box centos7 세팅 1. oracle virtual box for mac 설치 2. centos7 iso 받기 https://ftp-srv2.kddilabs.jp/Linux/packages/CentOS/..

bangpurin.tistory.com

2. springboot project에 axon dependencies 올려서 개발

axonVersion = "4.2.1"

implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
implementation group: 'org.axonframework', name: 'axon-configuration', version: "$axonVersion"

3. 블로그 흐름 따라가며 개발하였고

https://cla9.tistory.com/2?category=814447 

 

1. Axon Framework 개요

1. 개요 앞으로 진행될 포스팅은 MSA에 관심 많은 분을 대상으로 DDD, CQRS 및 Event Sourcing 내용을 알고 있다고 가정하고, Spring 환경에서 AxonFramework를 활용해 개념 구현하는 방법에 대해 소개하고자

cla9.tistory.com

4. 개발하면서 막히는 부분/ 깨달은 부분은 따로 정리

2022.01.12 - [개발/axon framework] - [axon] command/query project 생성 - clone coding

5. 깃에 등록

https://github.com/haileyjhbang/cqrs-clone.git


axon framework 구조

axon framework

Command Applcation

  • Event-Sourced Aggregate: EventStore로부터 Event를 재생하면서 모델을 최신 상태로
  • State-Stored Aggregate: EventStore에 Event를 적재와 별개로 모델 자체에 최신 상태를 DB에 저장

Query Application

  • Point to Point Query: 하나의 QueryHandler를 찾아 진행
  • Scatter-Gather Query: 동일한 Query를 처리하는 Handler가 여러 App에 등록되어있을 때, 이를 처리하는 방법
  • Subscription Query: Point to Point Query를 요청하였을 때, 만약 Query를 수행하는 Read Model이 바뀌었다면, 화면에 출력되는 결과와 Read Model 사이 데이터 정합성 불일치 문제가 발생한다. 따라서 이를 해결하기 위해 주기적으로 Query를 재요청하는 방식

 

추가) Saga pattern in axon

axon framework에서도 분산 트랜젝션을 위해 saga 패턴을 지원한다(saga event 정보를 db에 저장).

//어노테이션
@StartSaga
@SagaEventHandler(associationProperty = "transferID")
@EndSaga


//함수 호출방식
SagaLifecycle.associateWith("accountID", event.getDstAccountID());
SagaLifecycle.end();

[axon - saga; orchestration] https://cla9.tistory.com/22?category=814447

 

활용방안?

초기 그림의 MSA 구조의 시스템 & 대용량 처리가 필요한 시스템에 적용하면 효과적인 대안이 될 수 있을 것이란 생각이 들었다. ES 전파만 확실하면 쿼리 성능도 좋아질 것이고 두 애플리케이션의 결합도도 낮아 독립적인 스캐일 업 또한 가능할 것이다.

하지만 러닝 커브의 압박, 분산된 구조로 인한 트러블 포인트의 증가, 서비스 복잡도 증가 그리고 DB를 많이 쓰기 때문에 유능한 DBA나 넉넉한 DB공간 확보 필요 등의 비용이 많이 들 수 있어 비즈니스 로직이 간단한 서비스면 굳이 도입할 필요는 없을 것 같다.


참고

https://docs.axoniq.io/reference-guide/

 

Introduction - Axon Reference Guide

The standard version, called "Axon Server", is open source and free to download and use. It is provided under an AxonIQ-specific open source license. While this license allows you to run the software freely in any environment, it is less permissive than th

docs.axoniq.io

 

책: 마이크로 서비스 패턴(Chris Richardson 저)

http://www.yes24.com/Product/Goods/86542732

 

마이크로서비스 패턴 - YES24

모놀리식 애플리케이션을 마이크로서비스 아키텍처로 성공적으로 전환하는 방법마이크로서비스 아키텍처 기반의 애플리케이션을 성공적으로 구축하려면 새로운 아키텍처의 개념을 이해하는

www.yes24.com

요약본: https://microservices.io/patterns/index.html

 

Microservices Pattern: A pattern language for microservices

Microservices.io is brought to you by Chris Richardson. Experienced software architect, author of POJOs in Action, the creator of the original CloudFoundry.com, and the author of Microservices patterns. Chris helps clients around the world adopt the micros

microservices.io

 

https://app.mural.co/t/cloudingegration6924/m/cloudingegration6924/1598872302455/cb40356de0e1fcc36618a25f5f5e2ed18761f3ca

 

mSVC Patterns

 

app.mural.co

 

728x90
반응형

'개발 > axon framework' 카테고리의 다른 글

[axon] saga (2) - clone coding  (0) 2022.02.18
[axon] saga (1) - clone coding  (0) 2022.02.18
[axon] query handler(2) - clone coding  (0) 2022.02.14
[axon] query handler(1) - clone coding  (0) 2022.02.11
[axon] event upcasting - clone coding  (0) 2022.02.04
반응형

이전 글: 2022.02.18 - [개발/spring] - [axon] saga (1) - clone coding

 

[axon] saga (1) - clone coding

이전 글: 2022.02.14 - [개발/spring] - [axon] query handler(2) - clone coding 클론 코딩 참고 블로그는 다음과 같다: https://cla9.tistory.com/23?category=814447 19. Saga 패턴을 활용한 트랜잭션 관리 -..

bangpurin.tistory.com

클론 코딩 참고 블로그는 다음과 같다:  https://cla9.tistory.com/24?category=814447 

 

20. Saga 패턴을 활용한 트랜잭션 관리 - 3

1. 서론 이번 포스팅은 AxonFramework 관련 마지막 포스팅입니다. Saga 패턴 보상 트랜잭션 구현을 다루겠습니다. 2. Deadline MSA 환경에서는 App이 여러개로 분산되어있으므로 하나의 App이 느려지거나 장

cla9.tistory.com

 

Saga 트랜잭션에서 실패가 났을 경우 recovery 하는 방법에 대한 글이다. 딱히 신기술이나 신개념이 나오는 것은 아니고 exception을  try/catch 문으로 잡아서 catch문에서 새로운 이벤트를 전파시켜 롤백하는 과정이다. 즉, 로직적으로 해결하는 것이라서 흐름만 잘 따라가면 된다.

해당 글에서는 두 가지 케이스에 대해 다룬다.

  1. 타임아웃으로 인한 잔액 복구 케이스
  2. 타임아웃이 났는데 어차피 잔액이 모자라서 진행이 안 되는 케이스(하지만 진행을 했다가 취소하는 방식으로 진행한다; 이벤트는 삭제할 수 없기 때문에 취소 이벤트를 발생해줘야 한다)

각 흐름에 대해 정리한다. 소스를 보면서 따라가야 하며 로그를 같이 보면 훨씬 빠르게 접근할 수 있다. 

소스는 역시 깃허브에..

 

  • 타임아웃으로 인한 잔액 복구 케이스

중간에 요청 없어짐 표시는 무시해도 된다. 처음에 테스트할 때 잔액 차감 성공 이벤트가 로그에 찍혔는데, 잔액을 복구하는 로직이 없어서 이상하다 싶었는데 다시 확인해보니, 잔액 차감 성공(TransferApprovedCommand) 시 로직이 덜 짜져 있어서(if (!isExecutingCompensation && !isAbortingCompensation)이 부분이 없었다.) 잘못된 코드였고, 수정해서 안 나오는 게 맞다는 것을 확인했다. 즉, 잔액 차감 성공(TransferApprovedCommand) 을 받았지만 취소 중이면 차감을 아애 안 하는 로직인 것임.

괄호의 숫자는 로그의 시간이다. 시간순으로 나열한 것은 아니므로 주의해서 확인해야한다..

특이점은  jeju에서는 잔액이 감소(이체 성공)하고 다시 복구(이체 취소)하는 과정을 거치지만, command 쪽은 아예 잔액 변경이 없다.

--------------------command
commandGateway.sendAndWait(MoneyTransferCommand)
@CommandHandler transferMoney(MoneyTransferCommand)
	AggregateLifecycle.apply(MoneyTransferEvent)
///사가 시작
@StartSaga @SagaEventHandler(associationProperty = "transferID") on(MoneyTransferEvent)
	commandGateway.sendAndWait(JejuBankTransferCommand) (17:01:23.740)
	10초 기다리다 익셉션 (17:01:33.744)
--------------------jeju
@CommandHandler on(JejuBankTransferCommand)
	15초 홀딩하던도중 취소 요청을 받긴하지만 홀딩 후 성공처리 먼저함
	AggregateLifecycle.apply(TransferApprovedEvent) (17:01:38.790)
@EventSourcingHandler on(TransferApprovedEvent)
	잔액 차감 (17:01:38.791)
--------------------command
//요청 없어짐 
//{익셉션 (17:01:33.744)시 isExecutingCompensation = true로 바뀌었고 그 후에 들어온 이체 성공(잔액 차감) 요청이므로 진행 안 됨}
//@SagaEventHandler(associationProperty = "srcAccountID") on(TransferApprovedEvent)
//	제주 잔액 차감 성공 이벤트 리슨 후 반영
//	commandGateway.send(TransferApprovedCommand)
//@CommandHandler transferMoney(TransferApprovedCommand)
//	잔액 증가
//	AggregateLifecycle.apply(DepositMoneyEvent) //for query app
//	AggregateLifecycle.apply(DepositCompletedEvent)// 사가 종료;; 근데 왜 로그 없지?
//요청 없어짐
		
cancelTransfer(17:01:33.744)
	취소 요청
	commandGateway.send(JejuBankCancelTransferCommand)
--------------------jeju
@CommandHandler on(JejuBankCancelTransferCommand)
	취소 요청
	AggregateLifecycle.apply(CompletedCancelTransferEvent) (17:01:38.818)
@EventSourcingHandler on(CompletedCancelTransferEvent) (17:01:38.819)
	잔액 복구 (17:01:38.819)
--------------------command	
@SagaEventHandler(associationProperty = "srcAccountID") on(CompletedCancelTransferEvent)
	계좌이체 취소완료 (17:01:38.854)
///사가 종료

 

  • 타임아웃이 났는데 어차피 잔액이 모자라서 진행이 안 되는 케이스

여기서 특이점은 jeju에서는 잔액이 증가(복구)하고 다시 취소(회수)하는 과정을 거치지만, command쪽은 아애 잔액 변경이 없다. 타임아웃으로 인한 복구가 먼저 진행되고, 잔액이 없어서 이체를 취소하는 요청 시 복구 중인지 확인한 후 복구 중이면 다시 복구를 취소하는 것이다(복구 중이 아니면 그대로 종료).

--------------------command
commandGateway.sendAndWait(MoneyTransferCommand)
@CommandHandler transferMoney(MoneyTransferCommand) (17:34:06.636)
	AggregateLifecycle.apply(MoneyTransferEvent)
///사가 시작
@StartSaga @SagaEventHandler(associationProperty = "transferID") on(MoneyTransferEvent)
	commandGateway.sendAndWait(JejuBankTransferCommand) (17:34:06.708)
	10초 기다리다 익셉션 (17:34:16.715)
cancelTransfer
	취소 요청
	commandGateway.send(JejuBankCancelTransferCommand) (17:34:16.715)
--------------------jeju
@CommandHandler on(JejuBankTransferCommand)
	15초 홀딩하던 도중 취소 요청을 받긴하지만 홀딩 후 성공처리 먼저함
	AggregateLifecycle.apply(TransferDeniedEvent) (17:34:21.832)
@CommandHandler on(JejuBankCancelTransferCommand) (17:34:21.862)
	AggregateLifecycle.apply(CompletedCancelTransferEvent)
@EventSourcingHandler on(CompletedCancelTransferEvent)
	잔액 증가/복구됨 (17:34:21.862)
--------------------command
@SagaEventHandler(associationProperty = "srcAccountID") on(TransferDeniedEvent) (17:34:21.874)
	취소 중이면서 계좌이체도 실패 -> 잔액 복구되면 안됨
	commandGateway.send(JejuBankCompensationCancelCommand) (17:34:21.874)
--------------------jeju
@CommandHandler on(JejuBankCompensationCancelCommand) (17:34:21.896)
	apply(CompletedCompensationCancelEvent)
 @EventSourcingHandler on(CompletedCompensationCancelEvent)
	잔액 다시 감소(17:34:21.900)
--------------------command
@SagaEventHandler(associationProperty = "srcAccountID") @EndSaga on(CompletedCompensationCancelEvent)
	상황 종료 (17:34:21.923)
///사가 종료

 

와 드디어.. axon framework clone coding 20강의 여정이 끝났다. 너무 어렵고 이게 지금 당장 쓰일 수 있을 것 같지 않아서 중간에 그만두고 싶기도 한데, 어쨌든 끝나서 후련하다!!

다음 시간에는 axon framework를 (개인적으로) 총 정리해볼 예정이다.

2022.02.21 - [개발/spring] - [axon] clone coding 후기

 

[axon] clone coding 후기

들어가며.. 마이크로 서비스 아키텍처(MSA) 프로젝트를 개발 및 운영을 하다보면 자연스레 도메인 모델은 복잡해지고 점점 설계 시점의 의도와는 다른 방향으로 변질되는 일이 빈번히 발생한다.

bangpurin.tistory.com

 

728x90
반응형

'개발 > axon framework' 카테고리의 다른 글

[axon] clone coding 후기  (0) 2022.02.21
[axon] saga (1) - clone coding  (0) 2022.02.18
[axon] query handler(2) - clone coding  (0) 2022.02.14
[axon] query handler(1) - clone coding  (0) 2022.02.11
[axon] event upcasting - clone coding  (0) 2022.02.04
반응형

이전 글: 2022.02.14 - [개발/spring] - [axon] query handler(2) - clone coding

 

[axon] query handler(2) - clone coding

이전 글: 2022.02.11 - [개발/spring] - [axon] query handler - clone coding [axon] query handler - clone coding 이전 글: 2022.02.04 - [개발/spring] - [axon] event upcasting - clone coding [axon] eve..

bangpurin.tistory.com

클론 코딩 참고 블로그는 다음과 같다: https://cla9.tistory.com/23?category=814447 

 

19. Saga 패턴을 활용한 트랜잭션 관리 - 2

1. 서론 이번 포스팅은 분산 트랜잭션 제어를 위한 Saga 패턴을 구현하겠습니다. 보상 트랜잭션까지 같이 구현하면 내용이 복잡하므로 보상 트랜잭션 및 Deadline 기능은 다음 포스팅에서 다루겠습

cla9.tistory.com

 

 

jeju모듈을 state stored aggregate 방식으로 전환 시 추가로 설정해야 하는 부분이 있다.

1. postgres에 jeju 계정을 생성하고 권한을 부여한다.

# su - postgres 

$ psql 
postgres=# create user jeju with password 'jeju'; 
postgres=# create database jeju owner jeju; 
postgres=# ALTER ROLE jeju WITH createdb; 
postgres=# GRANT ALL PRIVILEGES ON DATABASE jeju TO jeju; 
postgres=# \q

2. 혹시 진행 시 아래와 같은 에러를 만나면..

connection to server on socket "/var/run/postgresql/.s.PGSQL.5432" failed: 치명적오류:  사용자 "jeju"의 peer 인증을 실패했습니다.

아래와 같이 설정을 수정하자.

vi /var/lib/pgsql/14/data/pg_hba.conf vi /var/lib/pgsql/14/data/pg_hba.conf 
//아래와 같이 수정(제주 추가)

# TYPE  DATABASE        USER            ADDRESS                 METHOD

# "local" is for Unix domain socket connections only
local   all             command, query, jeju                    md5
...

설정 후 디비 재시작 해야한다.

systemctl restart postgresql-14

 

command 애플리케이션에 @Saga 어노테이션이 활성화되니까 아래와 같이 axon server에 계속 쿼리를 날린다.

Hibernate: update token_entry set timestamp=? where processor_name=? and segment=? and owner=?

POST localhost:9091/account
Content-Type: application/json

{
	"accountID" : "test",
	"balance" : 100
}

위 api를 쏘고 jeju 애플리케이션을 관찰해본다.

14:02:24.357 DEBUG 11951 --- [mandProcessor-0] o.a.a.c.command.AxonServerCommandBus : Dispatch command [com.cqrs.jeju.command.AccountCreationCommand] locally
14:02:24.358 DEBUG 11951 --- [mandProcessor-0] o.a.commandhandling.SimpleCommandBus : Handling command [com.cqrs.jeju.command.AccountCreationCommand]
14:02:24.361 DEBUG 11951 --- [mandProcessor-0] o.a.m.unitofwork.AbstractUnitOfWork  : Starting Unit Of Work
---- for phase PREPARE_COMMIT
14:02:24.383 DEBUG 11951 --- [mandProcessor-0] com.cqrs.jeju.aggregate.Account      : >>> handling AccountCreationCommand(accountID=test-sender2, balance=1000)
14:02:24.389 DEBUG 11951 --- [mandProcessor-0] com.cqrs.jeju.aggregate.Account      : >>> event AccountCreationEvent(accountID=test-sender2, balance=1000)

Hibernate: insert into account (balance, accountid) values (?, ?)

AxonServerCommandBus 가 기본 command bus이기 때문에 command를 dispatch 하는데.. locally 하게 dispatch 하면 SimpleCommandBus가 핸들링하는가 보다.. 나중에 확인해봐야 하는 포인트

axon framework 에는 UnitOfWork라는 일의 단위가 있는데, 메시지를 처리하는 과정 중 수행해야 하는 액션을 조정하기 위해서 사용된다. UnitOfWork에는 여러 단계가 있고 보통 prepare commit 단계에 액션을 수행하는 것을 볼 수 있다.

또한 로그를 보면 CommandHandler가 수행하여 이벤트를 publish 한 후 EventSourcingHandler가 다시 받아서 처리하는 것을 알 수 있다.


POST http://localhost:8080/transfer
Content-Type: application/json

{
	"srcAccountID" : "test-sender2",
	"dstAccountID" : "5c3bb536-217b-45f1-adbf-edcc3f0a6c05",
	"amount" : 30,
	"bankType" : "JEJU"
}

위 전송 api를 쏘고 흐름을 파악하다가 멘붕이 왔다..

commandApplication jejuApplication queryApplication
transferServiceImpl
commandGateway(MoneyTransferCommand) 

@CommandHandler transferMoney(MoneyTransferCommand):MoneyTransferEvent 

@StartSaga @SagaEventHandler(associationProperty = "transferID") on(MoneyTransferEvent) 

commandGateway(JejuBankTransferCommand)
   
  @CommandHandler on : TransferApprovedEvent

@EventSourcingHandler on(TransferApprovedEvent): 잔액 차감 
 
@SagaEventHandler(associationProperty = "srcAccountID") on(TransferApprovedEvent)

commandGateway(TransferApprovedCommand)

@CommandHandler transferMoney(TransferApprovedCommand): DepositMoneyEvent/DepositCompletedEvent

@EventSourcingHandler -> 잔액 증가, 이벤트 끝
   
    @EventHandler on(DepositMoneyEvent)

 

정리하자면 아래와 같이 흘러간다.

  • commandGateway.send -> @CommandHandler
  • AggregateLifecycle.apply -> @EventSourcingHandler (command)
  • AggregateLifecycle.apply -> @EventHandler (query)
  • SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID()) -> @SagaEventHandler(associationProperty = "srcAccountID")

 

여기서 멘붕이 왔는데, 두 가지 이유 때문이었다.

  1. 로그에 command 쪽에서 잔액 증가/balance 관련 로그가 찍히지 않았다.
  2. 나는 command의 잔액이 증가하면 command 테이블의 account 테이블에 증액이 될 것 같았는데 그렇지 않았다.

1번은 잘못된 코드가 들어 있었다. 예전에 실습에 event-sourced-aggregate방식 -> state-stored-aggregate방식으로 전환하는 부분이 있었는데 이걸 다시 event-sourced-aggregate방식으로 바꿔놓고 이 실습을 진행하는 것이었는데(정확히 말하면 그 부분은 번외라 실습 제외 코드였다), 나는 그걸 모르고 state-stored-aggregate방식에서 진행하다 보니 일부 로직이 빠져있었다. 어쩐지 그 후 강의에서도 몇 번 코드가 이상하다고 느꼈다. 우선 실습을 위해 다시 event-sourced-aggregate방식으로 롤백하여 진행했다.

2번도 사실 1번의 연장인데, 나는 최종 상태가 DB에 있을 것이라 생각했으나 event-sourced-aggregate방식이라 DB에 마지막 값을 저장하지 않고 file에 이벤트만 쌓아두는 방식이었기에 당연한 것이었다. query 쪽에서는 그 이벤트를 받아다가 mv_account에 저장하고 있었으니, 원본 마지막에 command를 확인하는 게 아니라 query.mv_account 테이블을 직접 조회해서 확인하는 것이었다..!

 

그래서 전반적인 로그를 보면, (70 -> 80으로 10 증가)

2022-02-18 10:59:39.104 DEBUG 66614 --- [mandProcessor-2] c.c.command.aggregate.AccountAggregate   : >>> handling MoneyTransferCommand(srcAccountID=test-sender, dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, bankType=JEJU)
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : Created saga instance
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, srcAccountID=test-sender, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, commandFactory=com.cqrs.command.transfer.TransferCommandFactory@2eea704c)
2022-02-18 10:59:39.160  INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, srcAccountID=test-sender, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, commandFactory=com.cqrs.command.transfer.TransferCommandFactory@2eea704c) 
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] o.a.a.c.command.AxonServerCommandBus     : Dispatch command [com.cqrs.command.transfer.JejuBankTransferCommand] with callback
--------------------------- jeju application 으로 전파
Hibernate: select nextval ('hibernate_sequence')
Hibernate: select nextval ('hibernate_sequence')
2022-02-18 10:59:39.175 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore   : Storing saga id 4683dbfe-c141-43b9-b446-7df2b0d1855d as <com.cqrs.command.saga.TransferManager><commandFactory><transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand"><srcAccountID>test-sender</srcAccountID><dstAccountID>5c3bb536-217b-45f1-adbf-edcc3f0a6c05</dstAccountID><amount>10</amount><transferID>6791dff3-1e06-4aa4-88f9-09de7c9c078e</transferID></transferCommand></commandFactory></com.cqrs.command.saga.TransferManager>
Hibernate: insert into saga_entry (revision, saga_type, serialized_saga, saga_id) values (?, ?, ?, ?)
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
2022-02-18 10:59:39.221 DEBUG 66614 --- [ault-executor-1] o.a.a.c.event.axon.AxonServerEventStore  : Received event with token: 83
2022-02-18 10:59:39.249 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore   : Loaded saga id [4683dbfe-c141-43b9-b446-7df2b0d1855d] of type [com.cqrs.command.saga.TransferManager]
2022-02-18 10:59:39.250  INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : 이체 금액 10 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test-sender, dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, amount=10)
2022-02-18 10:59:39.252 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore   : Updating saga id 4683dbfe-c141-43b9-b446-7df2b0d1855d as <com.cqrs.command.saga.TransferManager><commandFactory><transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand"><srcAccountID>test-sender</srcAccountID><dstAccountID>5c3bb536-217b-45f1-adbf-edcc3f0a6c05</dstAccountID><amount>10</amount><transferID>6791dff3-1e06-4aa4-88f9-09de7c9c078e</transferID></transferCommand></commandFactory></com.cqrs.command.saga.TransferManager>
Hibernate: update saga_entry set serialized_saga=?, revision=? where saga_id=?
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
Hibernate: select nextval ('hibernate_sequence')
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> handling TransferApprovedCommand(accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e)
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 80
--------------------------- event sourced aggregate라서 다시 첨부터
2022-02-18 10:59:39.277 DEBUG 66614 --- [mandProcessor-3] o.a.a.c.event.axon.AxonServerEventStore  : Reading events for aggregate id 5c3bb536-217b-45f1-adbf-edcc3f0a6c05
2022-02-18 10:59:39.295 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying AccountCreationEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05)
2022-02-18 10:59:39.296 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=30)
2022-02-18 10:59:39.296 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 30
2022-02-18 10:59:39.307 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=30)
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 60
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 70
2022-02-18 10:59:39.313 DEBUG 66614 --- [ault-executor-1] o.a.a.c.e.AxonServerEventStoreClient     : Done request for 5c3bb536-217b-45f1-adbf-edcc3f0a6c05: 36ms, 12 events
2022-02-18 10:59:39.313 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.313 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 80
2022-02-18 10:59:39.337  INFO 66614 --- [ault-executor-1] o.a.a.c.event.axon.AxonServerEventStore  : Snapshot created
--------------------------- snapshot
2022-02-18 10:59:39.366 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore   : Loaded saga id [4683dbfe-c141-43b9-b446-7df2b0d1855d] of type [com.cqrs.command.saga.TransferManager]
2022-02-18 10:59:39.367  INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : 계좌 이체 성공 : DepositCompletedEvent(accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e)
Hibernate: delete from association_value_entry where saga_id=?
Hibernate: delete from saga_entry where saga_id=?

로그에서도 알 수 있지만, Command 모듈에서 MoneyTransferEvent Event가 발행되면 saga와 관련한 정보를 디비에 넣고(saga instance 생성) 수정하고 계속 불러오면서 확인하는 것을 알 수 있다. 또한 모든 작업이 끝나면 해당 데이터를 마지막에 삭제하는 것으로 마무리한다.


그래서!! 앞으로는 그 방식을 state-stored-aggregate방식으로 전환하는 작업을 해보도록 한다. 

1. axonConfig.java 의 모든 빈을 주석처리(event-sourced-aggregate의 snapshot 관련 설정이기에)

2. accountAggregate.java를 aggregate/entity 방식으로 다시 바꾸고 아래 함수 작성

@CommandHandler
protected void transferMoney(MoneyTransferCommand command){
    log.debug(">>> handling {}", command);
    //제주 -> command 기 때문에 잔액 검사 필요 없음
    AggregateLifecycle.apply(MoneyTransferEvent.builder()
            .srcAccountID(command.getSrcAccountID())
            .dstAccountID(command.getDstAccountID())
            .amount(command.getAmount())
            .commandFactory(command.getBankType().getCommandFactory(command))
            .transferID(command.getTransferID())
            .build());

}
@CommandHandler
protected void transferMoney(TransferApprovedCommand command){
    this.balance += command.getAmount();
    log.debug("=== balance {}", this.balance);
    AggregateLifecycle.apply(new DepositMoneyEvent(this.holder.getHolderID(), command.getAccountID(), command.getAmount()));
    AggregateLifecycle.apply(new DepositCompletedEvent(command.getAccountID(), command.getTransferID()));
}

3. holderAggregate.java 도 aggregate/entity 방식으로 전환

4. accountCreationCommand.java 도 aggregate/entity 방식으로 전환

5. holderRepository 부활, transactionServiceImpl도 기존 방식으로 전환

상세 소스는 깃허브에..

디비를 보면 command 에 150원 들어있고 그걸 projectiong한 query에도 150원 들어있어 있고, jeju에서는 150원이 차감된 것을 확인할 수 있다.

 

 


Saga 공식문서: https://docs.axoniq.io/reference-guide/axon-framework/sagas/associations

 

728x90
반응형

'개발 > axon framework' 카테고리의 다른 글

[axon] clone coding 후기  (0) 2022.02.21
[axon] saga (2) - clone coding  (0) 2022.02.18
[axon] query handler(2) - clone coding  (0) 2022.02.14
[axon] query handler(1) - clone coding  (0) 2022.02.11
[axon] event upcasting - clone coding  (0) 2022.02.04
반응형

코드를 보다보면 종종 transient 라는 키워드를 볼 때가 있다.

public class TransferManager {
    @Autowired
    private transient CommandGateway commandGateway;
    private TransferCommandFactory commandFactory;
...
}

 

의미?

Serialization is the process of converting an object into a byte stream, and deserialization is the opposite of it.
When we mark any variable as transient, then that variable is not serialized.

Serialization is the process of making the object's state persistent. That means the state of the object is converted into a stream of bytes to be used for persisting (e.g. storing bytes in a file) or transferring (e.g. sending bytes across a network). In the same way, we can use the deserialization to bring back the object's state from bytes. the process of making the object's state persistent.

직렬화(serialization)는 object를 byte stream으로 컨버팅하는 과정이며 역직렬화는 반대 과정을 뜻한다. 직렬화는 객체의 상태를 유지하게하는 과정 중 하나인데, 보통 직렬화를 해서 파일을 만들거나 네트워킹 작업을 하기 때문이다.

transient로 변수를 마킹할 경우 그 변수는 직렬화에 포함되지 않는다(null로 표현됨).

단지 메모리 안에서만 사용되는 변수

 

어디에 주로 사용되나?

  • 한 인스턴스 안에서 계산되어진 값을 임시로 들고 있을 때(for derived fields)
  • 객체의 상태를 나타내는 값이 아닌 변수(not the state of the object)
  • 직렬화가 되지 않는 변수(non-serializable references; throw “java.io.NotSerializableException”)

 

참고1. @Transient

  • entity안에서 사용되는 어노테이션으로 필드를 DB에 저장하지 않도록 해줌.
  • 메모리에 올려다가 놓고 쓰긴하지만 영속성을 주지 않는다는 뜻(not persistent)

참고2. transient vs volatile

  • volitiled은 항상 (thread dependency가 있는 cpu cache가 아닌) 메인 메모리에서 읽어오는 값
  • 메인 메모리에서 read/write하기 때문에 비용이 더 큼(성능 악화)
  • 여러 스레드에서 같은 값으로 사용가능
  • Multi Thread 환경에서 하나의 Thread만 read & write하고 나머지 Thread가 read하는 상황에서 가장 최신의 값을 보장

 

다른 자바 키워드: https://www.w3schools.com/java/java_ref_keywords.asp

 

Java Keywords

W3Schools offers free online tutorials, references and exercises in all the major languages of the web. Covering popular subjects like HTML, CSS, JavaScript, Python, SQL, Java, and many, many more.

www.w3schools.com

 

728x90
반응형

'개발 > java' 카테고리의 다른 글

[jmh] benchmark test  (0) 2022.08.04
[powermock] mock static method  (0) 2022.07.21
[java] jvm, java, gc  (0) 2022.02.24
[junit5] no test found / checked exception is invalid  (0) 2022.02.07
[junit] test runner  (0) 2022.01.03
반응형

이전 글: 2022.02.11 - [개발/spring] - [axon] query handler - clone coding

 

[axon] query handler - clone coding

이전 글: 2022.02.04 - [개발/spring] - [axon] event upcasting - clone coding [axon] event upcasting - clone coding 이전 글: 2022.01.25 - [개발/spring] - [axon] query/replay 성능개선 - clone coding [..

bangpurin.tistory.com

클론 코딩 참고 블로그는 다음과 같다: https://cla9.tistory.com/21?category=814447 

 

17. Query 어플리케이션 구현(Query) - 3

1. 서론 이번 포스팅에서는 Scatter-Gather Query를 구현하겠습니다. Scatter-Gather Query는 동일한 Query를 수행하는 Query Handler가 여러 App에 존재할 경우 모든 App에 Query를 요청하여 결과를 취합받아 최..

cla9.tistory.com

 

지난 시간에 이어 쿼리 핸들링에 대한 이야기이다.

총 세 가지 방법이 있는데 두 가지는 지난 시간에 코딩하였고 오늘은 마지막 방법인 Scatter-Gather Query를 구현한다.

위 블로그를 따라 구현하다보면 마지막 서버 실행 부분에 아래와 같이 circular reference 관련 에러가 난다.

Caused by: org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'org.axonframework.config.Configurer': Requested bean is currently in creation: Is there an unresolvable circular reference?

예전 글에서도 본 적 있는 에런데 아래와 같은 설정을 추가하면 된다. springboot2.6부터 circular reference 가 기본 설정에서 제외되었기 때문이다.

spring: 
  main:
    allow-circular-references: true

 

앱을 모두 실행하고 테스트 해봤을 때 로그를 확인해본다.

1. 쿼리 서비스 흐름

HolderAccountController.getAccountInfoScatterGather -> QueryServiceImpl.getAccountInfoScatterGather -> holderID / 잔액을 dto로 만들어서 제주/서울에게 넘긴 후 결과를 LoanLimitResult.class로 받아오라고 명령 -> 서울/제주의 QueryHandler 가 받아서 처리 -> Query Service에서는 두 곳에서의 응답이 다 오기를 기다리고 결과를 조합하여 리스트로 내려줌

Hibernate: select holderacco0_.holder_id as holder_i1_1_, holderacco0_.account_cnt as account_2_1_, holderacco0_.address as address3_1_, holderacco0_.name as name4_1_, holderacco0_.tel as tel5_1_, holderacco0_.total_balance as total_ba6_1_ from mv_account holderacco0_ where holderacco0_.holder_id=?
///////서울 제주로 요청 보냄

//////응답1 서울
15:57:34.506 DEBUG 8092 --- [ault-executor-1] o.a.a.c.query.AxonServerQueryBus         : Received query response [message_identifier: "204f7d62-8f12-4df7-881d-339387ab3fa2"
payload {
  type: "com.cqrs.loan.LoanLimitResult"
  data: "<com.cqrs.loan.LoanLimitResult><holderID>e5775054-4265-46ef-8116-297ac22f480d</holderID><bankName>SeoulBank</bankName><balance>7980</balance><loanLimit>11970</loanLimit></com.cqrs.loan.LoanLimitResult>"
}
meta_data {
  key: "traceId"
  value {
    text_value: "b540735f-c8a3-48da-96a7-9e90aa752966"
  }
}
meta_data {
  key: "correlationId"
  value {
    text_value: "b540735f-c8a3-48da-96a7-9e90aa752966"
  }
}
request_identifier: "b540735f-c8a3-48da-96a7-9e90aa752966"
]
//////응답2 제주
15:57:34.507 DEBUG 8092 --- [ault-executor-1] o.a.a.c.query.AxonServerQueryBus         : Received query response [message_identifier: "16efaa5a-e221-4740-85b2-ff5478d8ed4e"
payload {
  type: "com.cqrs.loan.LoanLimitResult"
  data: "<com.cqrs.loan.LoanLimitResult><holderID>e5775054-4265-46ef-8116-297ac22f480d</holderID><bankName>JejuBank</bankName><balance>7980</balance><loanLimit>9576</loanLimit></com.cqrs.loan.LoanLimitResult>"
}
meta_data {
  key: "traceId"
  value {
    text_value: "b540735f-c8a3-48da-96a7-9e90aa752966"
  }
}
meta_data {
  key: "correlationId"
  value {
    text_value: "b540735f-c8a3-48da-96a7-9e90aa752966"
  }
}
request_identifier: "b540735f-c8a3-48da-96a7-9e90aa752966"
]

2. 제주

15:57:34.464 DEBUG 8162 --- [ueryProcessor-0] c.c.jeju.component.AccountLoanComponent  : >>> handling LoanLimitQuery(holderID=e5775054-4265-46ef-8116-297ac22f480d, balance=7980)

3. 서울

15:57:34.464 DEBUG 8163 --- [ueryProcessor-0] c.c.s.component.AccountLoanComponent     : >>> handling LoanLimitQuery(holderID=e5775054-4265-46ef-8116-297ac22f480d, balance=7980)

 

결과 화면

 

 

728x90
반응형
반응형

이전 글: 2022.02.04 - [개발/spring] - [axon] event upcasting - clone coding

 

[axon] event upcasting - clone coding

이전 글: 2022.01.25 - [개발/spring] - [axon] query/replay 성능개선 - clone coding [axon] query/replay 성능개선 - clone coding 이전 글: 2022.01.24 - [개발/spring] - [axon] query/replay - clone coding..

bangpurin.tistory.com

클론 코딩 참고 블로그는 다음과 같다: https://cla9.tistory.com/20?category=814447 

 

16. Query 어플리케이션 구현(Query) - 2

1. 서론 이번 시간에는 Query 기능 중 Point to Point, Subscription 기능을 구현합니다. 또한, Query 결과를 보기 위하여 Client 화면을 간략하게 만들겠습니다. Client 화면은 크게 Point to Point Query와 Subs..

cla9.tistory.com

 

이번 글은 쿼리 핸들링에 관한 내용이다.

위 과정 중, 어노테이션 validation 부분이 스프링 버전업으로 빠져있어서 추가해주었다.

2022.02.11 - [개발/spring] - [annotation] NotNull NotBlank NonNull NotEmpty...

 

[annotation] NotNull NotBlank NonNull NotEmpty...

스프링에서 어노테이션 검증을 사용하면 별도의 로직없이 setter가 작용할 때(컨트롤러에 들어오기에도 전) 바로 변수 검증을 진행한다. 인입 로그 없이 에러가 나서 당혹스러울 수는 있지만 잘

bangpurin.tistory.com

 

1. Point to Point Query

api를 실행했을 때의 흐름을 살펴보면 HolderAccountController.getAccountInfo -> QueryServiceImpl.getAccountInfo -> queryGateway -> queryHandler로 가는 것을 알 수 있다.
여기를 어떻게 찾아가는지 확인하기 위해 아래와 같이 같은 request/response를 가지는 핸들러를 하나 더 만들어서 테스트를 해봤는데.. 

////on3 작동    
@QueryHandler
public HolderAccountSummary on3(AccountQuery query){
    log.debug(">>> handling fake {}", query);
    HolderAccountSummary res = new HolderAccountSummary();
    res.setName("test");
    return res;
}

@QueryHandler
public HolderAccountSummary one(AccountQuery query){
    log.debug(">>> handling queryHandler {}", query);
    return repository.findByHolderId(query.getHolderId()).orElse(null);
}
-----------------------------------
////on2 작동
@QueryHandler
public HolderAccountSummary on3(AccountQuery query){
    log.debug(">>> handling fake {}", query);
    HolderAccountSummary res = new HolderAccountSummary();
    res.setName("test");
    return res;
}

@QueryHandler
public HolderAccountSummary on2(AccountQuery query){
    log.debug(">>> handling queryHandler {}", query);
    return repository.findByHolderId(query.getHolderId()).orElse(null);
}

여러 번 이름을 바꿔서 실행해봤는데, 어째 이름의 alphabetical order.. 가 낮은 순(a-> b-> c..)으로 작동되는 것 같다. 

찾아보니 axon에서도 query handler 의 순서에 대해 명시해놓긴 했으나 명확한 기준이라고 하긴 애매하다.

1. On the actual instance level of the class hierarchy (as returned by this.getClass()), all annotated methods are evaluated
2.If one or more methods are found of which all parameters can be resolved to a value, the method with the most specific type is chosen and invoked
3.If no methods are found on this level of the class hierarchy, the super type is evaluated the same way
4.When the top level of the hierarchy is reached, and no suitable query handler is found, this query handling instance is ignored.

내가 이해한 바로는 1. request/response 타입에 맞는 핸들러인지를 먼저 확인하고, 2. 해당 핸들러가 복수개이면 더 하위 레벨/자세한(상속을 받았다거나) 쪽을 따르는 듯하다. 내가 짠 위 코드는 같은 형태의 핸들러가 복수개이지만 뎁스가 같아서 다른 기준으로 순서를 정했을 터인데... 알파벳순이 왠지 맞는 것 같다..

 

2. Subscription Query

api를 실행했을 때의 흐름을 살펴보면 HolderAccountController.getAccountInfoSubscription -> QueryServiceImpl.getAccountInfoSubscription -> flux를 이용하여 subscribe 하고 있다는 것을 알 수 있다.

event handler 중 바로 노티 받을 곳에서 emit을 하면 subscribe에서 받는 구조.

화면은 SSE(Server Sent Event) 방식으로 구현되어 있으며 EventSource 객체를 사용하였다.

최초에 ui의 조회 버튼으로 커낵션을 연결하면

queryResult.initialResult().subscribe(emitter::next);

위 로직으로 인해 화면에 현 상태가 화면에 먼저 뿌려지며, 그 후에는 버튼을 누르지 않아도 emit 된 값이 listen 중이던 flux 쪽으로 와서 doOnNext를 타고 ui로 간다. ui에서도 비동기를 받을 수 있는 EventSource객체로 url을 호출한지라 eventSource.onmessage 함수에서 자동으로 데이터를 가져와 화면에 뿌려준다. 조회는 한 번만 눌렀을 뿐인데 화면에 변한 값이 자동으로 보인다.(신기하다..)

이후 종료를 누르면 그 후 일어난 변화에 대해서는 자동으로 표시해주지 않는다.

//현 상태 확인
14:07:44.689 DEBUG 6007 --- [nio-9090-exec-1] com.cqrs.query.service.QueryServiceImpl  : >>> queryServiceImpl getAccountInfoSubscription handling AccountQuery(holderId=e5775054-4265-46ef-8116-297ac22f480d)
14:07:44.693 DEBUG 6007 --- [nio-9090-exec-1] o.a.a.c.query.AxonServerQueryBus         : Subscription Query requested with subscription Id [340e3830-c7f5-44b9-8f03-98ddb1722cca]
14:07:44.790 DEBUG 6007 --- [ault-executor-1] c.c.q.p.HolderAccountProjection          : >>> handling queryHandler AccountQuery(holderId=e5775054-4265-46ef-8116-297ac22f480d)
//10원 인출
14:08:04.926 DEBUG 6007 --- [sor[accounts]-1] c.c.q.p.HolderAccountProjection          : >>> projecting WithdrawMoneyEvent(holderID=e5775054-4265-46ef-8116-297ac22f480d, accountID=fffb8fa2-94dd-4e84-b7fd-072d09d01d33, amount=10) , timestamp : 2022-02-11T05:08:04.842Z
14:08:04.932 DEBUG 6007 --- [sor[accounts]-1] c.c.q.p.HolderAccountProjection          : >>> getHolder : e5775054-4265-46ef-8116-297ac22f480d 
14:08:04.978 DEBUG 6007 --- [ault-executor-0] com.cqrs.query.service.QueryServiceImpl  : doOnNext : com.cqrs.query.entity.HolderAccountSummary@7d175f21, isCanceled false
//10원 인출
14:08:13.930 DEBUG 6007 --- [sor[accounts]-1] c.c.q.p.HolderAccountProjection          : >>> projecting WithdrawMoneyEvent(holderID=e5775054-4265-46ef-8116-297ac22f480d, accountID=fffb8fa2-94dd-4e84-b7fd-072d09d01d33, amount=10) , timestamp : 2022-02-11T05:08:13.907Z
14:08:13.930 DEBUG 6007 --- [sor[accounts]-1] c.c.q.p.HolderAccountProjection          : >>> getHolder : e5775054-4265-46ef-8116-297ac22f480d 
14:08:13.943 DEBUG 6007 --- [ault-executor-1] com.cqrs.query.service.QueryServiceImpl  : doOnNext : com.cqrs.query.entity.HolderAccountSummary@358555d0, isCanceled false
//4000원 추가
14:08:34.520 DEBUG 6007 --- [sor[accounts]-1] c.c.q.p.HolderAccountProjection          : >>> projecting DepositMoneyEvent(holderID=e5775054-4265-46ef-8116-297ac22f480d, accountID=37ddb7c8-0d3b-443d-a6d5-c83d873f0521, amount=4000) , timestamp : 2022-02-11T05:08:34.499Z
14:08:34.521 DEBUG 6007 --- [sor[accounts]-1] c.c.q.p.HolderAccountProjection          : >>> getHolder : e5775054-4265-46ef-8116-297ac22f480d 
14:08:34.536 DEBUG 6007 --- [ault-executor-0] com.cqrs.query.service.QueryServiceImpl  : doOnNext : com.cqrs.query.entity.HolderAccountSummary@3cf7a219, isCanceled false

Subscription 방식은 한번 커넥션을 맺은 상태에서는 별다른 호출이 없어도 자동으로 변화를 감지하기 때문에 사용성이 좋을 것 같긴 하지만 webflux와 EventSource를 사용하면서 개발의 진입장벽이 있을 것으로 보인다. 또한 서버와 클라이언트의 Connection을 유지해야하기 때문에 Subscription Query가 증가할수록 그에 상응하는 Thread 수가 증가한다는 단점이 있다는 것을 확인해야 한다.

ui에서 별다른 유저 액션 없이도 어떻게 계속 동기화된 데이터를 받을 수 있을까를 고민했던 과거의 나에게 약간의 해답을 준 오늘의 실습이었다. 아직 모든 게 완벽히 와닿지는 않지만 여러 번 반복해서 보다 보면 조금씩 이해할 수 있지 않을까...

728x90
반응형
반응형

스프링에서 어노테이션 검증을 사용하면 별도의 로직없이 setter가 작용할 때(컨트롤러에 들어오기에도 전) 바로 변수 검증을 진행한다. 인입 로그 없이 에러가 나서 당혹스러울 수는 있지만 잘못된 변수가 들어와서 문제를 일으킬 우려를 아애 차단하는 방법이다.

가장 유명한 어노테이션 모듈은 javax.validation.constraints 가 제공하는 어노테이션인데 요즘에는 jetbrain이나 롬복에서도 비슷한 기능의 어노테이션이 있는 듯하다(비슷한 이름도 많아서 import 할 때 잘 봐야 한다).

springboot2.3 이후부터는 javax 모듈을 포함하던 어노테이션 검증자가 'org.springframework.boot:spring-boot-starter-web' 에서 제외되어 별도의 dependency가 필요하다.

implementation 'org.springframework.boot:spring-boot-starter-validation'

 

javax.validation.constraints 에서 제공하는 문자열 검증에 많이 쓰이는 세 가지 어노테이션(@NotNull, @NotEmpty, @NotBlank)과 롬복에서 제공하는 @NonNull 어노테이션을 같이 비교해본다. 

  null "" (empty string) "    " 허용 타입
@NotNull invalid valid valid all
@NotEmpty invalid invalid valid CharSequence
Collection
Map
Array
@NotBlank invalid invalid invalid(trimmed length > 0) CharSequence
(String)
@NonNull invalid valid valid  

위 표에서 간단하게 명시하긴 했지만, @NotNull, @NotEmpty, @NotBlank 와 @NonNull은 사실 쓰임이 다르다.

@NotNull, @NotEmpty, @NotBlank가 허용된 타입의 변수에 대한 확인이라면, @NonNull은 null-check 로직을 자동으로 생성해주는 애노테이션이다. 

생성자, 메소드 특정 파라미터에 @NonNull이 달려있으면 롬복은 자동적으로 해당 파라미터에 대한 null check 코드를 생성한다. 이때 null check 코드는 메서드나 생성자의 최상단에 위치한다. 또, 필드에 @NonNull이 달려있으면 해당 필드에 값을 설정하는 메서드들에도 null check 코드를 생성한다.

아래는 구현 예시이다. 

public ResponseEntity<Flux<HolderAccountSummary>> getAccountInfoSubscription(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
    return ResponseEntity.ok()
            .body(queryService.getAccountInfoSubscription(holderId));
}

이렇게 짜놓으면 빌드 클래스에는 아래와 같이 null check 코드가 추가되어 있다.

public ResponseEntity<Flux<HolderAccountSummary>> getAccountInfoSubscription(@PathVariable("id") @NotBlank @NonNull String holderId) {
    if (holderId == null) {
        throw new NullPointerException("holderId is marked non-null but is null");
    } else {
        return ResponseEntity.ok().body(this.queryService.getAccountInfoSubscription(holderId));
    }
}

null check 코드가 추가되었는데도 변수 앞에 @NonNull은 왜 안 지워져 있고 그대로인지는 의문이다..

 

참고로 javax.validation.constraints 라이브러리에 null 체크 말고도 어노테이션으로 검증할 수 있는 것들이 많은데(size, positive, negative, future, email, digits, assertTrue, etc.) 그때그때 찾아보면서 사용해야겠다.


참고: https://jyami.tistory.com/55

 

@Valid 를 이용해 @RequestBody 객체 검증하기

Springboot를 이용해서 어노테이션을 이용한 validation을 하는 방법을 적으려 한다. RestController를 이용하여 @RequestBody 객체를 사용자로부터 가져올 때, 들어오는 값들을 검증할 수 있는 방법을 소개한

jyami.tistory.com

롬복 어노테이션: daleseo.com/lombok-useful-annotations

728x90
반응형

+ Recent posts