[axon] saga (1) - clone coding
이전 글: 2022.02.14 - [개발/spring] - [axon] query handler(2) - clone coding
[axon] query handler(2) - clone coding
이전 글: 2022.02.11 - [개발/spring] - [axon] query handler - clone coding [axon] query handler - clone coding 이전 글: 2022.02.04 - [개발/spring] - [axon] event upcasting - clone coding [axon] eve..
bangpurin.tistory.com
클론 코딩 참고 블로그는 다음과 같다: https://cla9.tistory.com/23?category=814447
19. Saga 패턴을 활용한 트랜잭션 관리 - 2
1. 서론 이번 포스팅은 분산 트랜잭션 제어를 위한 Saga 패턴을 구현하겠습니다. 보상 트랜잭션까지 같이 구현하면 내용이 복잡하므로 보상 트랜잭션 및 Deadline 기능은 다음 포스팅에서 다루겠습
cla9.tistory.com
jeju모듈을 state stored aggregate 방식으로 전환 시 추가로 설정해야 하는 부분이 있다.
1. postgres에 jeju 계정을 생성하고 권한을 부여한다.
# su - postgres
$ psql
postgres=# create user jeju with password 'jeju';
postgres=# create database jeju owner jeju;
postgres=# ALTER ROLE jeju WITH createdb;
postgres=# GRANT ALL PRIVILEGES ON DATABASE jeju TO jeju;
postgres=# \q
2. 혹시 진행 시 아래와 같은 에러를 만나면..
connection to server on socket "/var/run/postgresql/.s.PGSQL.5432" failed: 치명적오류: 사용자 "jeju"의 peer 인증을 실패했습니다.
아래와 같이 설정을 수정하자.
vi /var/lib/pgsql/14/data/pg_hba.conf vi /var/lib/pgsql/14/data/pg_hba.conf
//아래와 같이 수정(제주 추가)
# TYPE DATABASE USER ADDRESS METHOD
# "local" is for Unix domain socket connections only
local all command, query, jeju md5
...
설정 후 디비 재시작 해야한다.
systemctl restart postgresql-14
command 애플리케이션에 @Saga 어노테이션이 활성화되니까 아래와 같이 axon server에 계속 쿼리를 날린다.
Hibernate: update token_entry set timestamp=? where processor_name=? and segment=? and owner=?
POST localhost:9091/account
Content-Type: application/json
{
"accountID" : "test",
"balance" : 100
}
위 api를 쏘고 jeju 애플리케이션을 관찰해본다.
14:02:24.357 DEBUG 11951 --- [mandProcessor-0] o.a.a.c.command.AxonServerCommandBus : Dispatch command [com.cqrs.jeju.command.AccountCreationCommand] locally
14:02:24.358 DEBUG 11951 --- [mandProcessor-0] o.a.commandhandling.SimpleCommandBus : Handling command [com.cqrs.jeju.command.AccountCreationCommand]
14:02:24.361 DEBUG 11951 --- [mandProcessor-0] o.a.m.unitofwork.AbstractUnitOfWork : Starting Unit Of Work
---- for phase PREPARE_COMMIT
14:02:24.383 DEBUG 11951 --- [mandProcessor-0] com.cqrs.jeju.aggregate.Account : >>> handling AccountCreationCommand(accountID=test-sender2, balance=1000)
14:02:24.389 DEBUG 11951 --- [mandProcessor-0] com.cqrs.jeju.aggregate.Account : >>> event AccountCreationEvent(accountID=test-sender2, balance=1000)
Hibernate: insert into account (balance, accountid) values (?, ?)
AxonServerCommandBus 가 기본 command bus이기 때문에 command를 dispatch 하는데.. locally 하게 dispatch 하면 SimpleCommandBus가 핸들링하는가 보다.. 나중에 확인해봐야 하는 포인트
axon framework 에는 UnitOfWork라는 일의 단위가 있는데, 메시지를 처리하는 과정 중 수행해야 하는 액션을 조정하기 위해서 사용된다. UnitOfWork에는 여러 단계가 있고 보통 prepare commit 단계에 액션을 수행하는 것을 볼 수 있다.
또한 로그를 보면 CommandHandler가 수행하여 이벤트를 publish 한 후 EventSourcingHandler가 다시 받아서 처리하는 것을 알 수 있다.
POST http://localhost:8080/transfer
Content-Type: application/json
{
"srcAccountID" : "test-sender2",
"dstAccountID" : "5c3bb536-217b-45f1-adbf-edcc3f0a6c05",
"amount" : 30,
"bankType" : "JEJU"
}
위 전송 api를 쏘고 흐름을 파악하다가 멘붕이 왔다..
commandApplication | jejuApplication | queryApplication |
transferServiceImpl commandGateway(MoneyTransferCommand) @CommandHandler transferMoney(MoneyTransferCommand):MoneyTransferEvent @StartSaga @SagaEventHandler(associationProperty = "transferID") on(MoneyTransferEvent) commandGateway(JejuBankTransferCommand) |
||
@CommandHandler on : TransferApprovedEvent @EventSourcingHandler on(TransferApprovedEvent): 잔액 차감 |
||
@SagaEventHandler(associationProperty = "srcAccountID") on(TransferApprovedEvent) commandGateway(TransferApprovedCommand) @CommandHandler transferMoney(TransferApprovedCommand): DepositMoneyEvent/DepositCompletedEvent @EventSourcingHandler -> 잔액 증가, 이벤트 끝 |
||
@EventHandler on(DepositMoneyEvent) |
정리하자면 아래와 같이 흘러간다.
- commandGateway.send -> @CommandHandler
- AggregateLifecycle.apply -> @EventSourcingHandler (command)
- AggregateLifecycle.apply -> @EventHandler (query)
- SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID()) -> @SagaEventHandler(associationProperty = "srcAccountID")
여기서 멘붕이 왔는데, 두 가지 이유 때문이었다.
- 로그에 command 쪽에서 잔액 증가/balance 관련 로그가 찍히지 않았다.
- 나는 command의 잔액이 증가하면 command 테이블의 account 테이블에 증액이 될 것 같았는데 그렇지 않았다.
1번은 잘못된 코드가 들어 있었다. 예전에 실습에 event-sourced-aggregate방식 -> state-stored-aggregate방식으로 전환하는 부분이 있었는데 이걸 다시 event-sourced-aggregate방식으로 바꿔놓고 이 실습을 진행하는 것이었는데(정확히 말하면 그 부분은 번외라 실습 제외 코드였다), 나는 그걸 모르고 state-stored-aggregate방식에서 진행하다 보니 일부 로직이 빠져있었다. 어쩐지 그 후 강의에서도 몇 번 코드가 이상하다고 느꼈다. 우선 실습을 위해 다시 event-sourced-aggregate방식으로 롤백하여 진행했다.
2번도 사실 1번의 연장인데, 나는 최종 상태가 DB에 있을 것이라 생각했으나 event-sourced-aggregate방식이라 DB에 마지막 값을 저장하지 않고 file에 이벤트만 쌓아두는 방식이었기에 당연한 것이었다. query 쪽에서는 그 이벤트를 받아다가 mv_account에 저장하고 있었으니, 원본 마지막에 command를 확인하는 게 아니라 query.mv_account 테이블을 직접 조회해서 확인하는 것이었다..!
그래서 전반적인 로그를 보면, (70 -> 80으로 10 증가)
2022-02-18 10:59:39.104 DEBUG 66614 --- [mandProcessor-2] c.c.command.aggregate.AccountAggregate : >>> handling MoneyTransferCommand(srcAccountID=test-sender, dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, bankType=JEJU)
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager : Created saga instance
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager : event : MoneyTransferEvent(dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, srcAccountID=test-sender, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, commandFactory=com.cqrs.command.transfer.TransferCommandFactory@2eea704c)
2022-02-18 10:59:39.160 INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, srcAccountID=test-sender, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, commandFactory=com.cqrs.command.transfer.TransferCommandFactory@2eea704c)
2022-02-18 10:59:39.160 DEBUG 66614 --- [gerProcessor]-0] o.a.a.c.command.AxonServerCommandBus : Dispatch command [com.cqrs.command.transfer.JejuBankTransferCommand] with callback
--------------------------- jeju application 으로 전파
Hibernate: select nextval ('hibernate_sequence')
Hibernate: select nextval ('hibernate_sequence')
2022-02-18 10:59:39.175 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore : Storing saga id 4683dbfe-c141-43b9-b446-7df2b0d1855d as <com.cqrs.command.saga.TransferManager><commandFactory><transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand"><srcAccountID>test-sender</srcAccountID><dstAccountID>5c3bb536-217b-45f1-adbf-edcc3f0a6c05</dstAccountID><amount>10</amount><transferID>6791dff3-1e06-4aa4-88f9-09de7c9c078e</transferID></transferCommand></commandFactory></com.cqrs.command.saga.TransferManager>
Hibernate: insert into saga_entry (revision, saga_type, serialized_saga, saga_id) values (?, ?, ?, ?)
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
2022-02-18 10:59:39.221 DEBUG 66614 --- [ault-executor-1] o.a.a.c.event.axon.AxonServerEventStore : Received event with token: 83
2022-02-18 10:59:39.249 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore : Loaded saga id [4683dbfe-c141-43b9-b446-7df2b0d1855d] of type [com.cqrs.command.saga.TransferManager]
2022-02-18 10:59:39.250 INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager : 이체 금액 10 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test-sender, dstAccountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e, amount=10)
2022-02-18 10:59:39.252 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore : Updating saga id 4683dbfe-c141-43b9-b446-7df2b0d1855d as <com.cqrs.command.saga.TransferManager><commandFactory><transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand"><srcAccountID>test-sender</srcAccountID><dstAccountID>5c3bb536-217b-45f1-adbf-edcc3f0a6c05</dstAccountID><amount>10</amount><transferID>6791dff3-1e06-4aa4-88f9-09de7c9c078e</transferID></transferCommand></commandFactory></com.cqrs.command.saga.TransferManager>
Hibernate: update saga_entry set serialized_saga=?, revision=? where saga_id=?
Hibernate: insert into association_value_entry (association_key, association_value, saga_id, saga_type, id) values (?, ?, ?, ?, ?)
Hibernate: select nextval ('hibernate_sequence')
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : >>> handling TransferApprovedCommand(accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e)
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.262 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : === balance 80
--------------------------- event sourced aggregate라서 다시 첨부터
2022-02-18 10:59:39.277 DEBUG 66614 --- [mandProcessor-3] o.a.a.c.event.axon.AxonServerEventStore : Reading events for aggregate id 5c3bb536-217b-45f1-adbf-edcc3f0a6c05
2022-02-18 10:59:39.295 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : >>> applying AccountCreationEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05)
2022-02-18 10:59:39.296 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=30)
2022-02-18 10:59:39.296 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : === balance 30
2022-02-18 10:59:39.307 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=30)
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : === balance 60
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.308 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : === balance 70
2022-02-18 10:59:39.313 DEBUG 66614 --- [ault-executor-1] o.a.a.c.e.AxonServerEventStoreClient : Done request for 5c3bb536-217b-45f1-adbf-edcc3f0a6c05: 36ms, 12 events
2022-02-18 10:59:39.313 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : >>> applying DepositMoneyEvent(holderID=98228ee7-9571-4c52-8023-2137c889abd0, accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, amount=10)
2022-02-18 10:59:39.313 DEBUG 66614 --- [mandProcessor-3] c.c.command.aggregate.AccountAggregate : === balance 80
2022-02-18 10:59:39.337 INFO 66614 --- [ault-executor-1] o.a.a.c.event.axon.AxonServerEventStore : Snapshot created
--------------------------- snapshot
2022-02-18 10:59:39.366 DEBUG 66614 --- [gerProcessor]-0] o.a.m.saga.repository.jpa.JpaSagaStore : Loaded saga id [4683dbfe-c141-43b9-b446-7df2b0d1855d] of type [com.cqrs.command.saga.TransferManager]
2022-02-18 10:59:39.367 INFO 66614 --- [gerProcessor]-0] com.cqrs.command.saga.TransferManager : 계좌 이체 성공 : DepositCompletedEvent(accountID=5c3bb536-217b-45f1-adbf-edcc3f0a6c05, transferID=6791dff3-1e06-4aa4-88f9-09de7c9c078e)
Hibernate: delete from association_value_entry where saga_id=?
Hibernate: delete from saga_entry where saga_id=?
로그에서도 알 수 있지만, Command 모듈에서 MoneyTransferEvent Event가 발행되면 saga와 관련한 정보를 디비에 넣고(saga instance 생성) 수정하고 계속 불러오면서 확인하는 것을 알 수 있다. 또한 모든 작업이 끝나면 해당 데이터를 마지막에 삭제하는 것으로 마무리한다.
그래서!! 앞으로는 그 방식을 state-stored-aggregate방식으로 전환하는 작업을 해보도록 한다.
1. axonConfig.java 의 모든 빈을 주석처리(event-sourced-aggregate의 snapshot 관련 설정이기에)
2. accountAggregate.java를 aggregate/entity 방식으로 다시 바꾸고 아래 함수 작성
@CommandHandler
protected void transferMoney(MoneyTransferCommand command){
log.debug(">>> handling {}", command);
//제주 -> command 기 때문에 잔액 검사 필요 없음
AggregateLifecycle.apply(MoneyTransferEvent.builder()
.srcAccountID(command.getSrcAccountID())
.dstAccountID(command.getDstAccountID())
.amount(command.getAmount())
.commandFactory(command.getBankType().getCommandFactory(command))
.transferID(command.getTransferID())
.build());
}
@CommandHandler
protected void transferMoney(TransferApprovedCommand command){
this.balance += command.getAmount();
log.debug("=== balance {}", this.balance);
AggregateLifecycle.apply(new DepositMoneyEvent(this.holder.getHolderID(), command.getAccountID(), command.getAmount()));
AggregateLifecycle.apply(new DepositCompletedEvent(command.getAccountID(), command.getTransferID()));
}
3. holderAggregate.java 도 aggregate/entity 방식으로 전환
4. accountCreationCommand.java 도 aggregate/entity 방식으로 전환
5. holderRepository 부활, transactionServiceImpl도 기존 방식으로 전환
상세 소스는 깃허브에..
디비를 보면 command 에 150원 들어있고 그걸 projectiong한 query에도 150원 들어있어 있고, jeju에서는 150원이 차감된 것을 확인할 수 있다.
Saga 공식문서: https://docs.axoniq.io/reference-guide/axon-framework/sagas/associations