개발/axon framework

[axon] saga (1) - clone coding

방푸린 2022. 2. 18. 11:58
반응형

이전 글: 2022.02.14 - [개발/spring] - [axon] query handler(2) - clone coding

 

[axon] query handler(2) - clone coding

이전 글: 2022.02.11 - [개발/spring] - [axon] query handler - clone coding [axon] query handler - clone coding 이전 글: 2022.02.04 - [개발/spring] - [axon] event upcasting - clone coding [axon] eve..

bangpurin.tistory.com

클론 코딩 참고 블로그는 다음과 같다: https://cla9.tistory.com/23?category=814447 

 

19. Saga 패턴을 활용한 트랜잭션 관리 - 2

1. 서론 이번 포스팅은 분산 트랜잭션 제어를 위한 Saga 패턴을 구현하겠습니다. 보상 트랜잭션까지 같이 구현하면 내용이 복잡하므로 보상 트랜잭션 및 Deadline 기능은 다음 포스팅에서 다루겠습

cla9.tistory.com

 

 

jeju모듈을 state stored aggregate 방식으로 전환 시 추가로 설정해야 하는 부분이 있다.

1. postgres에 jeju 계정을 생성하고 권한을 부여한다.

# su - postgres 

$ psql 
postgres=# create user jeju with password 'jeju'; 
postgres=# create database jeju owner jeju; 
postgres=# ALTER ROLE jeju WITH createdb; 
postgres=# GRANT ALL PRIVILEGES ON DATABASE jeju TO jeju; 
postgres=# \q

2. 혹시 진행 시 아래와 같은 에러를 만나면..

connection to server on socket "/var/run/postgresql/.s.PGSQL.5432" failed: 치명적오류:  사용자 "jeju"의 peer 인증을 실패했습니다.

아래와 같이 설정을 수정하자.

vi /var/lib/pgsql/14/data/pg_hba.conf vi /var/lib/pgsql/14/data/pg_hba.conf 
//아래와 같이 수정(제주 추가)

# TYPE  DATABASE        USER            ADDRESS                 METHOD

# "local" is for Unix domain socket connections only
local   all             command, query, jeju                    md5
...

설정 후 디비 재시작 해야한다.

systemctl restart postgresql-14

 

command 애플리케이션에 @Saga 어노테이션이 활성화되니까 아래와 같이 axon server에 계속 쿼리를 날린다.

Hibernate: update token_entry set timestamp=? where processor_name=? and segment=? and owner=?

POST localhost:9091/account
Content-Type: application/json

{
	"accountID" : "test",
	"balance" : 100
}

위 api를 쏘고 jeju 애플리케이션을 관찰해본다.

14:02:24.357 DEBUG 11951 --- [mandProcessor-0] o.a.a.c.command.AxonServerCommandBus : Dispatch command [com.cqrs.jeju.command.AccountCreationCommand] locally
14:02:24.358 DEBUG 11951 --- [mandProcessor-0] o.a.commandhandling.SimpleCommandBus : Handling command [com.cqrs.jeju.command.AccountCreationCommand]
14:02:24.361 DEBUG 11951 --- [mandProcessor-0] o.a.m.unitofwork.AbstractUnitOfWork  : Starting Unit Of Work
---- for phase PREPARE_COMMIT
14:02:24.383 DEBUG 11951 --- [mandProcessor-0] com.cqrs.jeju.aggregate.Account      : >>> handling AccountCreationCommand(accountID=test-sender2, balance=1000)
14:02:24.389 DEBUG 11951 --- [mandProcessor-0] com.cqrs.jeju.aggregate.Account      : >>> event AccountCreationEvent(accountID=test-sender2, balance=1000)

Hibernate: insert into account (balance, accountid) values (?, ?)

AxonServerCommandBus 가 기본 command bus이기 때문에 command를 dispatch 하는데.. locally 하게 dispatch 하면 SimpleCommandBus가 핸들링하는가 보다.. 나중에 확인해봐야 하는 포인트

axon framework 에는 UnitOfWork라는 일의 단위가 있는데, 메시지를 처리하는 과정 중 수행해야 하는 액션을 조정하기 위해서 사용된다. UnitOfWork에는 여러 단계가 있고 보통 prepare commit 단계에 액션을 수행하는 것을 볼 수 있다.

또한 로그를 보면 CommandHandler가 수행하여 이벤트를 publish 한 후 EventSourcingHandler가 다시 받아서 처리하는 것을 알 수 있다.


POST http://localhost:8080/transfer
Content-Type: application/json

{
	"srcAccountID" : "test-sender2",
	"dstAccountID" : "5c3bb536-217b-45f1-adbf-edcc3f0a6c05",
	"amount" : 30,
	"bankType" : "JEJU"
}

위 전송 api를 쏘고 흐름을 파악하다가 멘붕이 왔다..

commandApplication jejuApplication queryApplication
transferServiceImpl
commandGateway(MoneyTransferCommand) 

@CommandHandler transferMoney(MoneyTransferCommand):MoneyTransferEvent 

@StartSaga @SagaEventHandler(associationProperty = "transferID") on(MoneyTransferEvent) 

commandGateway(JejuBankTransferCommand)
   
  @CommandHandler on : TransferApprovedEvent

@EventSourcingHandler on(TransferApprovedEvent): 잔액 차감 
 
@SagaEventHandler(associationProperty = "srcAccountID") on(TransferApprovedEvent)

commandGateway(TransferApprovedCommand)

@CommandHandler transferMoney(TransferApprovedCommand): DepositMoneyEvent/DepositCompletedEvent

@EventSourcingHandler -> 잔액 증가, 이벤트 끝
   
    @EventHandler on(DepositMoneyEvent)

 

정리하자면 아래와 같이 흘러간다.

  • commandGateway.send -> @CommandHandler
  • AggregateLifecycle.apply -> @EventSourcingHandler (command)
  • AggregateLifecycle.apply -> @EventHandler (query)
  • SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID()) -> @SagaEventHandler(associationProperty = "srcAccountID")

 

여기서 멘붕이 왔는데, 두 가지 이유 때문이었다.

  1. 로그에 command 쪽에서 잔액 증가/balance 관련 로그가 찍히지 않았다.
  2. 나는 command의 잔액이 증가하면 command 테이블의 account 테이블에 증액이 될 것 같았는데 그렇지 않았다.

1번은 잘못된 코드가 들어 있었다. 예전에 실습에 event-sourced-aggregate방식 -> state-stored-aggregate방식으로 전환하는 부분이 있었는데 이걸 다시 event-sourced-aggregate방식으로 바꿔놓고 이 실습을 진행하는 것이었는데(정확히 말하면 그 부분은 번외라 실습 제외 코드였다), 나는 그걸 모르고 state-stored-aggregate방식에서 진행하다 보니 일부 로직이 빠져있었다. 어쩐지 그 후 강의에서도 몇 번 코드가 이상하다고 느꼈다. 우선 실습을 위해 다시 event-sourced-aggregate방식으로 롤백하여 진행했다.

2번도 사실 1번의 연장인데, 나는 최종 상태가 DB에 있을 것이라 생각했으나 event-sourced-aggregate방식이라 DB에 마지막 값을 저장하지 않고 file에 이벤트만 쌓아두는 방식이었기에 당연한 것이었다. query 쪽에서는 그 이벤트를 받아다가 mv_account에 저장하고 있었으니, 원본 마지막에 command를 확인하는 게 아니라 query.mv_account 테이블을 직접 조회해서 확인하는 것이었다..!

 

그래서 전반적인 로그를 보면, (70 -> 80으로 10 증가)

2022-02-18 10:59:39.104 DEBUG 66614 --- [mandProcessor-2] c.c.command.aggregate.AccountAggregate   : >>> handling MoneyTransferCommand(srcAccountID=test-sender, dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, bankType=JEJU)
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : Created saga instance
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, srcAccountID=test-sender, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, commandFactory=com.cqrs.command.transfer.TransferCommandFactory@2eea704c)
2022-02-18 10:59:39.160  INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, srcAccountID=test-sender, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, commandFactory=com.cqrs.command.transfer.TransferCommandFactory@2eea704c) 
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] o.a.a.c.command.AxonServerCommandBus     : Dispatch command [com.cqrs.command.transfer.JejuBankTransferCommand] with callback
--------------------------- jeju application 으로 전파
Hibernate: select nextval ('hibernate_sequence')
Hibernate: select nextval ('hibernate_sequence')
2022-02-18 10:59:39.175 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore   : Storing saga id 4683dbfe-c141-43b9-b446-7df2b0d1855d as <com.cqrs.command.saga.TransferManager><commandFactory><transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand"><srcAccountID>test-sender</srcAccountID><dstAccountID>5c3bb536-217b-45f1-adbf-edcc3f0a6c05</dstAccountID><amount>10</amount><transferID>6791dff3-1e06-4aa4-88f9-09de7c9c078e</transferID></transferCommand></commandFactory></com.cqrs.command.saga.TransferManager>
Hibernate: insert into saga_entry (revision, saga_type, serialized_saga, saga_id) values (?, ?, ?, ?)
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
2022-02-18 10:59:39.221 DEBUG 66614 --- [ault-executor-1] o.a.a.c.event.axon.AxonServerEventStore  : Received event with token: 83
2022-02-18 10:59:39.249 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore   : Loaded saga id [4683dbfe-c141-43b9-b446-7df2b0d1855d] of type [com.cqrs.command.saga.TransferManager]
2022-02-18 10:59:39.250  INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : 이체 금액 10 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test-sender, dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, amount=10)
2022-02-18 10:59:39.252 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore   : Updating saga id 4683dbfe-c141-43b9-b446-7df2b0d1855d as <com.cqrs.command.saga.TransferManager><commandFactory><transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand"><srcAccountID>test-sender</srcAccountID><dstAccountID>5c3bb536-217b-45f1-adbf-edcc3f0a6c05</dstAccountID><amount>10</amount><transferID>6791dff3-1e06-4aa4-88f9-09de7c9c078e</transferID></transferCommand></commandFactory></com.cqrs.command.saga.TransferManager>
Hibernate: update saga_entry set serialized_saga=?, revision=? where saga_id=?
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
Hibernate: select nextval ('hibernate_sequence')
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> handling TransferApprovedCommand(accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e)
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 80
--------------------------- event sourced aggregate라서 다시 첨부터
2022-02-18 10:59:39.277 DEBUG 66614 --- [mandProcessor-3] o.a.a.c.event.axon.AxonServerEventStore  : Reading events for aggregate id 5c3bb536-217b-45f1-adbf-edcc3f0a6c05
2022-02-18 10:59:39.295 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying AccountCreationEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05)
2022-02-18 10:59:39.296 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=30)
2022-02-18 10:59:39.296 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 30
2022-02-18 10:59:39.307 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=30)
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 60
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 70
2022-02-18 10:59:39.313 DEBUG 66614 --- [ault-executor-1] o.a.a.c.e.AxonServerEventStoreClient     : Done request for 5c3bb536-217b-45f1-adbf-edcc3f0a6c05: 36ms, 12 events
2022-02-18 10:59:39.313 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.313 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate   : === balance 80
2022-02-18 10:59:39.337  INFO 66614 --- [ault-executor-1] o.a.a.c.event.axon.AxonServerEventStore  : Snapshot created
--------------------------- snapshot
2022-02-18 10:59:39.366 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore   : Loaded saga id [4683dbfe-c141-43b9-b446-7df2b0d1855d] of type [com.cqrs.command.saga.TransferManager]
2022-02-18 10:59:39.367  INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager    : 계좌 이체 성공 : DepositCompletedEvent(accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e)
Hibernate: delete from association_value_entry where saga_id=?
Hibernate: delete from saga_entry where saga_id=?

로그에서도 알 수 있지만, Command 모듈에서 MoneyTransferEvent Event가 발행되면 saga와 관련한 정보를 디비에 넣고(saga instance 생성) 수정하고 계속 불러오면서 확인하는 것을 알 수 있다. 또한 모든 작업이 끝나면 해당 데이터를 마지막에 삭제하는 것으로 마무리한다.


그래서!! 앞으로는 그 방식을 state-stored-aggregate방식으로 전환하는 작업을 해보도록 한다. 

1. axonConfig.java 의 모든 빈을 주석처리(event-sourced-aggregate의 snapshot 관련 설정이기에)

2. accountAggregate.java를 aggregate/entity 방식으로 다시 바꾸고 아래 함수 작성

@CommandHandler
protected void transferMoney(MoneyTransferCommand command){
    log.debug(">>> handling {}", command);
    //제주 -> command 기 때문에 잔액 검사 필요 없음
    AggregateLifecycle.apply(MoneyTransferEvent.builder()
            .srcAccountID(command.getSrcAccountID())
            .dstAccountID(command.getDstAccountID())
            .amount(command.getAmount())
            .commandFactory(command.getBankType().getCommandFactory(command))
            .transferID(command.getTransferID())
            .build());

}
@CommandHandler
protected void transferMoney(TransferApprovedCommand command){
    this.balance += command.getAmount();
    log.debug("=== balance {}", this.balance);
    AggregateLifecycle.apply(new DepositMoneyEvent(this.holder.getHolderID(), command.getAccountID(), command.getAmount()));
    AggregateLifecycle.apply(new DepositCompletedEvent(command.getAccountID(), command.getTransferID()));
}

3. holderAggregate.java 도 aggregate/entity 방식으로 전환

4. accountCreationCommand.java 도 aggregate/entity 방식으로 전환

5. holderRepository 부활, transactionServiceImpl도 기존 방식으로 전환

상세 소스는 깃허브에..

디비를 보면 command 에 150원 들어있고 그걸 projectiong한 query에도 150원 들어있어 있고, jeju에서는 150원이 차감된 것을 확인할 수 있다.

 

 


Saga 공식문서: https://docs.axoniq.io/reference-guide/axon-framework/sagas/associations

 

728x90
반응형