반응형

이전 글: 2022.01.24 - [개발/spring] - [axon] query/replay - clone coding

 

[axon] query/replay - clone coding

2022.01.19 - [개발/spring] - [axon] state stored aggregate - clone coding [axon] state stored aggregate - clone coding 2022.01.18 - [개발/spring] - [axon] snapshot - clone coding [axon] snapshot - c..

bangpurin.tistory.com

클론 코딩 참고 블로그는 다음와 같다: https://cla9.tistory.com/17?category=814447 

 

13. Query 어플리케이션 구현(Event) - 3

1. 서론 이전 포스팅에서 Replay에 대해서 학습했습니다. Replay는 신규 Read Model이 추가되거나 기존 모델의 변경이 있을 때, EventStore에서 기존 내역을 전달받아 재수행하는 작업입니다. 따라서 EventSo

cla9.tistory.com

 

지난 시간, replay(초기화 후 이벤트 다시 재생) 시 성능 이슈에 대해 잠깐 언급했는데, 이번 시간에 개선 포인트에 대해 생각해본다.

1. batch size

프로젝트에 아래와 같이 설정되어 있거나 없다면(기본값이 tracking) event processor 중 tracking 모드를 사용 중이다.

axon:
  eventhandling:
    processors:
      name:
        mode: tracking

위 내용은 아래와 같이 스프링 빈으로 등록하여 추가적인 설정을 할 수도 있다. 

배치사이즈 변경, 초기 세그먼트 수 세팅, 스레드 팩토리 지정, 단일스레드 설정, 병렬처리 시 스레드 수 지정 등.. 관련 초기값 및 설정 방법은 아래와 같이 빈을 생성하고 제공되는 함수를 통해 여러 값을 변경하면 된다.

@Configuration
public class AxonConfig {

    @Autowired
    public void configure(EventProcessingConfigurer eventProcessingConfigurer){
        eventProcessingConfigurer.registerTrackingEventProcessor(
               "accounts",
                org.axonframework.config.Configuration::eventStore,
                configuration -> TrackingEventProcessorConfiguration
                        .forSingleThreadedProcessing()
                        .andBatchSize(100) //default 1
        );
    }
}

우선 싱글스레드로 설정하였고 배치 사이즈를 100으로 조정하였다.

여기서 name(위 예시에서 "accounts")을 주어야 하는데, processingGroup의 이름을 넣으면 된다. 이게 뭐냐고..?

사실 저번 시간에 reset 함수를 구현할 때 tracking mode 에 대한 설정을 했었는데..

@Override
public void reset() {
    configuration.eventProcessingConfiguration()
            .eventProcessorByProcessingGroup("accounts", TrackingEventProcessor.class)
            .ifPresent(trackingEventProcessor -> {
                trackingEventProcessor.shutDown();
                trackingEventProcessor.resetTokens(); // 토큰초기화
                trackingEventProcessor.start();
            });
}

여기서 주었던 이름과 processing type 에 힌트가 있었다.

그렇다면 어떤 값들을 바꿀 수 있는건가?

참고로 AbstractEventProcessor 구현체로 TrackingEventProcessor와 SubscribingEventProcessor가 있는데, 우리는 tracking모드로 구현하였으니 TrackingEventProcessor를 사용한다. 그리고 TrackingEventProcessor가 사용하는 설정 값은 TrackingEventProcessorConfiguration 에 있다.

기본값은 아래와 같다.

  • Thread 수 : 1개
  • Batch Size : 1
  • 최대 Thread 수 : Segment 개수
  • TokenClaim 주기 : 5000ms
TrackingEventProcessorConfiguration.java

private static final int DEFAULT_BATCH_SIZE = 1;
private static final int DEFAULT_THREAD_COUNT = 1;
//단위 ms
private static final int DEFAULT_TOKEN_CLAIM_INTERVAL = 5000;
private int eventAvailabilityTimeout = 1000;

private TrackingEventProcessorConfiguration(int numberOfSegments) {
    this.batchSize = DEFAULT_BATCH_SIZE;
    this.initialSegmentCount = numberOfSegments;
    this.maxThreadCount = numberOfSegments;
    this.threadFactory = pn -> new AxonThreadFactory("EventProcessor[" + pn + "]");
    this.tokenClaimInterval = DEFAULT_TOKEN_CLAIM_INTERVAL;
}

위 클래스에 정의된 함수를 사용하여 설정 값을 변경하면 된다.

 

2. 병렬처리

성능 향상을 고민할 때, 보통 생각하는 방법이 병렬 처리 기법이다. 허나 여기서 병렬로 처리 시 이벤트의 순서가 꼬이게 될 가능성이 있는데, axon에서는 이를 위해 sequencing 정책을 제공한다. 위 설정에서 단일 스레드를 병렬로 변경하고 스레드 갯수를 지정함다.

@Configuration
public class AxonConfig {

    @Autowired
    public void configure(EventProcessingConfigurer eventProcessingConfigurer){
        eventProcessingConfigurer.registerTrackingEventProcessor(
          "accounts",
                org.axonframework.config.Configuration::eventStore,
                configuration -> TrackingEventProcessorConfiguration
                        .forParallelProcessing(3) ////
                        .andBatchSize(100)
        );

        eventProcessingConfigurer.registerSequencingPolicy(
            "accounts",
                configuration -> SequentialPerAggregatePolicy.instance() ////
        );
    }
}

하지만 이렇게 해도 aggregate가 다르면 다른 스레드에서 실행될 수 있어 (예를 들어) 계좌 생성 이전에 잔액 조회가 실행되어 에러가 날 수 있다.

이를 위해 실패 시 자동으로 재시도하게 하는 spring-retry를 사용한다. 

implementation 'org.springframework.retry:spring-retry'
 
 //service @EnableRetry 추가
@EnableRetry
@Component
@RequiredArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
 
 //method @Retryable 추가; 언제(에러발생 시) / 몇 번 시도(5회) / 언제 시도(1초 후)
@Retryable(value = {NoSuchElementException.class}, maxAttempts = 5, backoff = @Backoff(delay = 1000))
@EventHandler
@AllowReplay
protected void on(AccountCreationEvent event, @Timestamp Instant instant){

 

기타 포인트는 클론 코딩 참고 블로그에 자세히 기술되어 있어 읽어보면 되겠다.

728x90
반응형

+ Recent posts