import java.util.function.Function;
public class FirstClassObjectExample {
public static void main(String[] args) {
// 함수가 변수에 할당됨
Function<String, String> greet = name -> "Hello, " + name + "!";
// 함수가 매개변수로 전달됨
sayHello(greet, "Alice");
// 함수가 반환 값으로 사용됨
Function<String, String> greetFn = getGreetFunction();
System.out.println(greetFn.apply("Bob"));
}
public static void sayHello(Function<String, String> fn, String name) {
System.out.println(fn.apply(name));
}
public static Function<String, String> getGreetFunction() {
return name -> "Hi, " + name + "!";
}
}
고차 함수 (Higher-Order Function)
고차 함수는 다음 중 하나 이상의 조건을 만족하는 함수:
다른 함수를 인자로 받을 수 있다.
다른 함수를 반환할 수 있다.
Java 8의 Stream API는 함수형 프로그래밍의 개념을 적극 활용한다. 메서드 체이닝을 통해 고차 함수의 형태로 map, filter, reduce 등의 연산을 수행할 수 있다.
Java에서 콜백을 처리할 때 고차 함수가 자주 사용된다. 특정 작업이 완료되었을 때 실행할 동작을 함수로 전달할 수 있다.
import java.util.function.Consumer;
import java.util.function.Function;
public class HigherOrderFunctionExample {
public static void main(String[] args) {
// 함수를 매개변수로 받는 고차 함수
repeat(3, i -> System.out.println("Hello, " + i));
// 함수를 반환하는 고차 함수
Function<Integer, Integer> doubleFn = createMultiplier(2);
System.out.println(doubleFn.apply(5)); // 10
}
public static void repeat(int n, Consumer<Integer> action) {
for (int i = 0; i < n; i++) {
action.accept(i);
}
}
public static Function<Integer, Integer> createMultiplier(int multiplier) {
return value -> value * multiplier;
}
}
용어가 비슷해서 헷갈리지만 다른....(객체 지향 관련 개념)
일급 컬렉션 (First-class Collection)
일급 컬렉션은 컬렉션을 직접 사용하지 않고 컬렉션을 래핑하는 클래스를 만들어, 해당 클래스를 통해서만 컬렉션을 조작하도록 하는 디자인 패턴이다. 이를 통해 코드의 명확성과 유지 보수성을 높이고, 불변성을 보장할 수 있다.
불변성 보장: 일급 컬렉션 클래스는 컬렉션에 대한 직접적인 접근을 방지하고, 불변성을 유지하도록 도와준다.
비즈니스 로직 캡슐화: 컬렉션에 대한 비즈니스 로직을 일급 컬렉션 클래스 내부에 캡슐화하여 코드의 응집도를 높임.
컬렉션 관련 메서드 제공: 컬렉션을 조작하기 위한 메서드들을 일급 컬렉션 클래스에서 제공하여, 코드의 명확성을 높임.
특정 타입의 컬렉션 강제: 일급 컬렉션은 특정 타입의 컬렉션만을 다루도록 강제할 수 있다.
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
public class Products {
private final List<Product> products;
public Products(List<Product> products) {
this.products = new ArrayList<>(products);
}
// 불변성을 유지하기 위해 컬렉션 반환 시 복사본 제공
public List<Product> getProducts() {
return Collections.unmodifiableList(products);
}
// 전체 가격 계산 같은 비즈니스 로직 캡슐화
public double totalPrice() {
return products.stream()
.mapToDouble(Product::getPrice)
.sum();
}
// 제품 추가 메서드
public Products addProduct(Product product) {
List<Product> newProducts = new ArrayList<>(products);
newProducts.add(product);
return new Products(newProducts);
}
}
insert 문이 이렇게 있을 때 insert문이 1개만 나가는지, 청크 수만큼 나가는지 궁금해졌다.
insert 문이 1개만 나간다는 의미는 values 뒤로 n개 붙은 문이 한번 나가는 것이고
청크 수 만큼 나간다는 것은 insert 문 자체가 n 개 있다는 뜻.
MyBatisBatchItemWriter의 write 함수를 살펴보면 아래와 같다.
while 문으로 청크를 돌아서 sql을 만들고 들고 있다가 한 번에 실행한다.
ExecutorType.BATCH로 설정된 SqlSessionTemplate에서는, update() 메서드 호출 시 쿼리를 바로 실행하지 않고 내부 배치 큐에 저장하고 flushStatements()를 호출하면, 지금까지 배치 큐에 저장된 모든 SQL 문을 한 번에 실행
장점:
네트워크 요청 최소화: 각 SQL 문을 개별적으로 실행하지 않고, 배치로 묶어서 처리
성능 향상: 배치 처리 시 JDBC 드라이버가 여러 쿼리를 내부적으로 최적화
주의점:
메모리 사용량: 배치 큐에 저장된 쿼리가 많아질 경우 메모리 사용량이 증가
트랜잭션 관리: 배치 처리 중 하나의 쿼리가 실패하면, 전체 배치가 롤백
그럼 values 뒤로 쫙 붙여서 한번에 쏘고 싶다면?
우선 mapper를 수정하고
@Bean(INSERT_NINE_RATING_RANKING_WRITER)
@StepScope
public ItemWriter<BadukEnrichedRanking> insertNineRatingRankingWriter() {
return chunk -> {
@SuppressWarnings("unchecked") var items = (List<BadukEnrichedRanking>) chunk.getItems();
var splittedNineRankings = ListUtil.splitList(items, SPLIT_LIST_SIZE);
splittedNineRankings.forEach(badukNineRankingMapper::insertNineRankings);
};
}
MyBatisBatchItemWriter를 안 쓰고 수동으로 itemWriter를 만든 후
chunk를 sublist로 쪼갠 후 foreach 에 연결시킨다.
그러면 1 insert 의 values에 여러 개가 붙고 각 호출이 개별적인 SQL 실행을 하게 된다.
혹시 배치 방식으로 바꾸려면..
return chunk -> {
@SuppressWarnings("unchecked")
var items = (List<BadukEnrichedRanking>) chunk.getItems();
var splittedNineRankings = ListUtil.splitList(items, SPLIT_LIST_SIZE);
// Batch 처리 활성화
try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {
var mapper = sqlSession.getMapper(BadukNineRankingMapper.class);
splittedNineRankings.forEach(mapper::insertNineRankings);
// 배치 실행
sqlSession.flushStatements();
sqlSession.commit();
}
};
MyBatisBatchItemWriter(ExecutorType.BATCH):
ExecutorType.BATCH 모드에서는 하나의 SqlSession을 열고 여러 쿼리를 실행한 후 한 번에 flushStatements()를 호출하여 쿼리들을 모아서 데이터베이스에 전송
이 모드는 SQL 세션을 한 번만 열고, 여러 개의 쿼리를 하나의 트랜잭션 내에서 실행. 세션을 닫기 전에 모든 쿼리가 메모리에 쌓이고, flushStatements()를 호출하여 한 번에 실행되므로 성능 면에서 효율적
forEach 방식 (기본 SqlSession):
반면에 forEach를 사용하여 각각의 항목을 처리하는 경우, 매번 update 또는 insert가 실행될 때마다 SqlSession을 생성
이 방식은 각각의 쿼리가 별도의 세션을 사용하거나, 적어도 별도의 쿼리 실행이 이루어지는 방식. 즉, SQL 세션을 매번 열고 update 또는 insert를 실행한 후 세션을 닫고, 다시 열어서 쿼리를 실행하는 방식
최신 버전의 스프링부트를 쓰면 어느 새부터 아래와 같은 워닝을 만나는데 상당히 신경 쓰인다. 그동안 Page 인터페이스를 아주 많이 사용했던 터라 혹시 안되거나 deprecated 된다면 난감하기 때문이다..
찾아보니 springboot3.3부터 변경되었다고 한다!
2024-12-12 13:57:23 WARN [ration$PageModule$WarningLoggingModifier.changeProperties : 156] Serializing PageImpl instances as-is is not supported, meaning that there is no guarantee about the stability of the resulting JSON structure!
For a stable JSON structure, please use Spring Data's PagedModel (globally via @EnableSpringDataWebSupport(pageSerializationMode = VIA_DTO))
or Spring HATEOAS and Spring Data's PagedResourcesAssembler as documented in https://docs.spring.io/spring-data/commons/reference/repositories/core-extensions.html#core.web.pageables.
내용은 직렬화 시 안정적이지 않으니 Spring Data의 PagedModel 또는 PagedResourcesAssembler를 사용하여 안정적인 JSON 구조를 생성하라는 것이다.
그동안 직렬화할 때 PageImpl을 직접 직렬화하였는데, 더이상 안정적인 방식이 아니니 아래 두 방식 중 하나를 고르라는 뜻
HATEOAS를 쓰면 PagedResourcesAssembler, 그렇지 않으면 PagedModel
JSON 구조가 API의 변경이나 버전 업그레이드에 따라 변할 수 있으므로, 안정성이 떨어질 수 있음
이 모드는 이전 버전의 Spring Data에서 기본적으로 사용되던 방식
DTO
DTO(Data Transfer Object)를 사용하여 페이지 데이터를 직렬화
안정적이고 일관된 JSON 구조를 제공
DTO를 사용함으로써 페이지 데이터의 구조가 API 변경에 영향을 덜 받음
이 모드는 PagedModel 또는 PagedResourcesAssembler와 함께 사용되며, 이를 통해 클라이언트가 예측 가능한 형식의 데이터를 수신할 수 있다.
설정하고 기존과 똑같이 페이징하면 된다.
@GetMapping
public Page<BaseResponse> getPrices(
참고로 HATEOAS
HATEOAS는 REST API 설계의 원칙 중 하나로, 클라이언트가 서버 응답에 포함된 하이퍼미디어(hypermedia)를 통해 애플리케이션 상태를 동적으로 탐색할 수 있도록 하는 방식이다. 이 원칙은 REST의 자기 설명(self-descriptive) 특성을 강화한다
HATEOAS의 구성 요소
링크(Link): API 응답에 포함된 URL. 다음 가능한 액션을 안내.
상태(State): 현재 리소스의 상태.
동작(Action): 링크를 따라가면 수행할 수 있는 작업.
HATEOAS의 사용 이유
API 탐색성 증가:
클라이언트는 추가적인 문서 없이 서버 응답에 포함된 링크를 통해 어떤 작업이 가능한지 동적으로 파악할 수 있음
클라이언트-서버 결합도 감소:
클라이언트는 서버가 제공하는 링크를 따라가기 때문에 특정 엔드포인트에 강하게 의존하지 않음
유연한 확장성:
서버에서 새로운 액션이나 엔드포인트를 추가하더라도, 클라이언트는 변경 없이 새로운 기능을 사용할 수 있음
자기 설명적 API:
서버 응답에 포함된 하이퍼미디어가 클라이언트에게 리소스 상태 및 가능한 작업을 설명하므로 API의 문서화와 유지보수가 용이
O(log(N)) 복잡도: 추가, 삭제, 조회 연산의 시간 복잡도는 O(log(N))입니다.
주요 명령어:
ZADD: Sorted Set에 요소를 추가합니다.
ZRANGE: 지정한 범위의 요소를 가져옵니다.
ZREM: 요소를 삭제합니다.
ZSCORE: 요소의 점수를 확인합니다.
ZRANK: 요소의 순위를 확인합니다.
//ZADD key score member [score member ...] 추가
ZADD le/aderboard 100 alice
ZADD leaderboard 200 bob 150 charlie
//ZSCORE key member 조회
ZSCORE leaderboard alice # 결과: 100
//ZRANGE key start stop [WITHSCORES] 인덱스기반(시작; 0) 범위 조회 [점수도 같이 반환]
ZRANGE leaderboard 0 -1 WITHSCORES # 전체 조회
//ZRANGEBYSCORE key min max [WITHSCORES] 점수 기반 범위 조회
ZRANGEBYSCORE leaderboard 100 200 WITHSCORES
//ZRANK key member 0시작 순위 반환(오름차순)
ZRANK leaderboard bob # 결과: 2
//ZREVRANK key member 내림차순 순위 반환
ZREVRANK leaderboard bob # 결과: 0
//ZREM key member [member ...] 특정 맴버 삭제
ZREM leaderboard alice
활용 사례
(1) 리더보드
게임에서 점수에 따라 순위를 관리할 때 유용합니다.
점수를 score로, 사용자 이름이나 ID를 member로 저장.
순위 조회, 점수 범위 내 사용자 검색 등이 가능.
(2) 태스크 스케줄링
점수를 타임스탬프로 사용하여 작업을 스케줄링.
특정 시간 범위의 작업을 조회하거나 삭제 가능.
(3) 우선순위 큐
점수를 우선순위로 사용하여 작업을 관리.
주의점
메모리 사용량: 점수와 멤버를 함께 저장하므로 메모리 사용량이 단순 Set보다 큽니다.
점수 정밀도: 점수는 부동소수점(Floating Point)이므로 정밀도에 주의해야 합니다.
TTL 설정
Redis의 Sorted Set 자체에는 직접적인 TTL(Time To Live) 설정이 지원되지 않습니다. Redis는 Key-Value 기반으로 동작하므로, TTL은 키(key) 단위로 설정됩니다. 따라서 zset 안의 각 맴버에게 만료시간을 설정하는 것이 아닌 Sorted Set 전체에 대해 TTL을 설정해야 합니다.
TTL은 키 전체에 적용됩니다. Sorted Set의 개별 멤버에 TTL을 설정할 수는 없습니다.
개별 멤버 TTL 관리가 꼭 필요하다면, 점수를 TTL처럼 사용하거나 별도 키로 TTL 관리하는 방법이 가장 실용적입니다.
TTL 설정 후, 키이 삭제되면 Sorted Set에 저장된 모든 멤버도 함께 삭제됩니다.
TTL은 주로 일시적인 데이터에 사용됩니다. 예를 들어, 일일 리더보드나 임시 작업 큐 등에 적합합니다.
키가 이미 만료된 상태에서 접근하면, Redis는 키를 자동으로 삭제하고, 해당 키에 대한 작업은 무시됩니다.
레디스로 구현하려면..
sorted set으로 랭킹 저장하고 각 맴버별로 hash만들어서 ttl 설정한 후 노티피케이션이나 주기적으로 확인하여 set에서 삭제하는 로직 작성하여 수동으로 관리해야함..
# Sorted Set에 멤버 추가
ZADD myset 100 user1
ZADD myset 200 user2
# user1의 TTL이 필요하면 별도 키 생성 후 TTL 적용
SET user1_temp_value some_value
EXPIRE user1_temp_value 3600 # user1_temp_value 키에 TTL 1시간 설정
기존에 멀티 데이터베이스를 쓸 때 분산 트랜젝션을 위해 아래와 같이 ChainedTransaction을 사용하였는데..
@Configuration
public class ChainedTransactionConfiguration {
@Primary
@Bean(Constants.CHAINED_TRANSACTION_MANAGER)
public PlatformTransactionManager transactionManager(@Qualifier(Constants.USER_TRANSACTION_MANAGER) PlatformTransactionManager userPlatformTransactionManager,
@Qualifier(Constants.LOG_TRANSACTION_MANAGER) PlatformTransactionManager logPlatformTransactionManager,
@Qualifier(Constants.STATIC_TRANSACTION_MANAGER) PlatformTransactionManager staticPlatformTransactionManager) {
return new ChainedTransactionManager(userPlatformTransactionManager, logPlatformTransactionManager, staticPlatformTransactionManager);
}
}
아래와 같이 Deprecated 되었다.
여러 대안을 찾다가 JtaTransaction이 있어 사용가능한지 확인해 본다.
조건은
1. 멀티 데이터베이스이기 때문에 각각에 대해 단일 Transactional을 설정할 수 있어야 하고
2. 필요에 따라 복합 트랜젝션도 가능해야 한다.
우선 JtaTransaction이 뭔지 간단히 알아보자.
JtaTransactionManager는 기본적으로 여러 데이터베이스에 걸쳐 트랜잭션을 처리하는 역할을 한다. 그러나 이를 제대로 활성화하려면 다음과 같은 조건을 충족해야 한다:
XA 데이터 소스 설정: 분산 트랜잭션을 사용하려면 XADataSource를 사용해야 한다. 예를 들어, MySQL을 사용할 경우 MysqlXADataSource를 사용해야 하며, 다른 데이터베이스도 XA 지원을 해야 한다.
트랜잭션 관리자의 설정: JtaTransactionManager는 기본적으로 JTA를 사용하여 트랜잭션을 관리하지만, 분산 트랜잭션을 활성화하려면 여러 데이터 소스를 연결하고 이를 관리할 수 있는 TransactionManager 설정이 필요하다.
@Configuration
public class JtaDbConfig {
@Bean(name = "chainedTransaction")
public JtaTransactionManager transactionManager() {
JtaTransactionManager transactionManager = new JtaTransactionManager();
// JTA 트랜잭션 매니저 설정
return transactionManager;
}
// DataSource 1 설정 (XA DataSource)
@Bean
public DataSource dataSource1() {
MysqlXADataSource dataSource = new MysqlXADataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/db1");
dataSource.setUser("user1");
dataSource.setPassword("pass1");
return dataSource;
}
// DataSource 2 설정 (XA DataSource)
@Bean
public DataSource dataSource2() {
MysqlXADataSource dataSource = new MysqlXADataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/db2");
dataSource.setUser("user2");
dataSource.setPassword("pass2");
return dataSource;
}
// EntityManagerFactory 설정 (각각의 데이터베이스용)
@Bean(name = "entityManagerFactory1")
public LocalContainerEntityManagerFactoryBean entityManagerFactory1(EntityManagerFactoryBuilder builder) {
return builder
.dataSource(dataSource1())
.packages("com.example.entity1")
.build();
}
@Bean(name = "entityManagerFactory2")
public LocalContainerEntityManagerFactoryBean entityManagerFactory2(EntityManagerFactoryBuilder builder) {
return builder
.dataSource(dataSource2())
.packages("com.example.entity2")
.build();
}
}
JtaTransactionManager 도 단일 트랜젝션 관리 가능한가?
JTA 트랜잭션 관리의 범위:
JtaTransactionManager는 분산 트랜잭션(XA 트랜잭션)을 관리하는 데 최적화되어 있지만, 단일 데이터 소스에서의 트랜잭션도 처리할 수 있음
단일 데이터 소스만 사용할 경우에도 JTA 프로토콜을 통해 트랜잭션이 시작되고 종료됨
단일 트랜잭션 시 처리 동작:
단일 데이터 소스에서 JtaTransactionManager는 해당 데이터 소스에서의 트랜잭션을 관리함
단일 데이터 소스 환경에서는 JpaTransactionManager나 DataSourceTransactionManager처럼 작동함
단일 트랜잭션 환경에서는 JpaTransactionManager나 DataSourceTransactionManager가 더 효율적일 수 있다. 이는 JTA 오버헤드가 없기 때문
분산 트랜잭션이 필요 없는 경우 굳이 JtaTransactionManager를 사용할 필요는 없음
참고:
JpaTransactionManager는 JPA에 특화되어 있으며, 트랜잭션이 하나의 데이터베이스일 경우에 적합
JtaTransactionManager는 JTA를 지원하며, XA 데이터 소스를 사용하는 분산 트랜잭션을 관리할 수 있음
JtaTransactionManager는 단일 트랜잭션도 처리할 수 있지만, 분산 트랜잭션이 필요 없는 경우에는 더 가벼운 트랜잭션 매니저(JpaTransactionManager 또는 DataSourceTransactionManager)를 사용하는 것이 더 효율적임. 하지만 프로젝트 환경에서 단일 및 분산 트랜잭션이 모두 필요하다면 JtaTransactionManager를 사용해 통합적으로 관리 가능
그럼 단일 트랜젝션이 필요할 경우 더 가볍게 설정할 수는 없을까?
그거슨 불가..
JtaTransactionManager가 기본적으로 JTA 규격에 따라 동작하며, 트랜잭션의 범위는 리소스에 따라 자동으로 결정되기 때문이다..
정 필요하면 아래처럼 JpaTransactionManager / JtaTransactionManager 각각 만들어서 필요에 따라 transactionManager를 지정하는 방법뿐.. 이라는데 이건 좀 아닌 듯....
@Configuration
public class DataSourceConfig {
// 첫 번째 데이터베이스 - 단일 트랜잭션용
@Bean(name = "dataSource1Hikari")
public DataSource dataSource1Hikari() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db1");
dataSource.setUsername("user1");
dataSource.setPassword("password1");
return dataSource;
}
// 첫 번째 데이터베이스 - 분산 트랜잭션용
@Bean(name = "dataSource1XA")
public DataSource dataSource1XA() {
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://localhost:3306/db1");
xaDataSource.setUser("user1");
xaDataSource.setPassword("password1");
return xaDataSource;
}
// 두 번째 데이터베이스 - 단일 트랜잭션용
@Bean(name = "dataSource2Hikari")
public DataSource dataSource2Hikari() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db2");
dataSource.setUsername("user2");
dataSource.setPassword("password2");
return dataSource;
}
// 두 번째 데이터베이스 - 분산 트랜잭션용
@Bean(name = "dataSource2XA")
public DataSource dataSource2XA() {
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://localhost:3306/db2");
xaDataSource.setUser("user2");
xaDataSource.setPassword("password2");
return xaDataSource;
}
}
@Configuration
public class TransactionManagerConfig {
// 첫 번째 데이터베이스 - 단일 트랜잭션
@Bean(name = "transactionManager1")
public DataSourceTransactionManager transactionManager1(
@Qualifier("dataSource1Hikari") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
// 두 번째 데이터베이스 - 단일 트랜잭션
@Bean(name = "transactionManager2")
public DataSourceTransactionManager transactionManager2(
@Qualifier("dataSource2Hikari") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
// JTA 트랜잭션 매니저 (분산 트랜잭션 관리)
@Bean(name = "jtaTransactionManager")
public JtaTransactionManager jtaTransactionManager(
@Qualifier("dataSource1XA") DataSource dataSource1XA,
@Qualifier("dataSource2XA") DataSource dataSource2XA) {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransactionImp userTransactionImp = new UserTransactionImp();
return new JtaTransactionManager(userTransactionImp, userTransactionManager);
}
}
@Service
public class DbService {
@Transactional(transactionManager = "transactionManager1")
public void performDb1Operation() {
// 첫 번째 데이터베이스 트랜잭션 작업
}
@Transactional(transactionManager = "transactionManager2")
public void performDb2Operation() {
// 두 번째 데이터베이스 트랜잭션 작업
}
@Transactional(transactionManager = "jtaTransactionManager")
public void performMultiDbOperation() {
// DB1과 DB2를 조율하는 분산 트랜잭션 작업
}
}
JTA 프로토콜: 2단계 커밋 (2PC)
JTA는 2PC (Two-Phase Commit) 프로토콜을 사용하여 분산 트랜잭션의 원자성과 일관성을 보장한다. 이 프로토콜은 다음 두 단계를 포함한다.
1단계: Prepare
트랜잭션 관리자(Transaction Manager)는 모든 자원 관리자(XAResource)에 "Prepare" 메시지를 보냄
각 자원 관리자는 트랜잭션을 준비하고, 성공 여부를 반환(예: VoteCommit 또는 VoteRollback)
2단계: Commit or Rollback
모든 자원이 VoteCommit을 반환하면 트랜잭션 관리자는 "Commit" 메시지를 보내 트랜잭션을 커밋
하나라도 VoteRollback을 반환하면 모든 자원에 "Rollback" 메시지를 보내 트랜잭션을 롤백
JTA (Java Transaction API)
javax.transaction 패키지
UserTransaction, TransactionManager, XAResource 같은 인터페이스를 제공
트랜잭션을 시작/종료하는 표준 방식 정의
스프링, Java EE, Jakarta EE에서 트랜잭션을 추상화할 때 사용
단독으로 동작 X → 구현체가 필요함
예: Atomikos, Bitronix, Narayana 등이 JTA를 구현
Atomikos
JTA 구현체이자 분산 트랜잭션 관리 라이브러리
다중 데이터소스 (예: MySQL + Oracle), 메시지 브로커 (Kafka, JMS) 등 여러 리소스에 걸친 트랜잭션을 지원
XA 프로토콜 기반의 **2PC (Two-Phase Commit)**을 수행
트랜잭션 복구, 타임아웃, 로그 등 고급 기능 포함
Spring Boot나 Spring에서 사용하려면 AtomikosJtaPlatform, AtomikosDataSourceBean 설정 필요
즉,
JTA = 인터페이스
Atomikos = 구현체 (라이브러리)
스프링에서는 JtaTransactionManager를 사용하고, 실제 구현체로 Atomikos를 등록해 줌
ForkJoinPool은 Java 7에 도입된 병렬 처리 프레임워크로, 작업을 작은 단위로 분할(fork)하고 병렬로 처리한 후 다시 합치는(join) 방식으로 동작한다. 병렬 프로그래밍과 작업 스케줄링을 위한 강력한 도구로, 특히 대규모 데이터 처리나 계산 집약적인 작업에 유용하다.
이게 프레임워크?
고수준의 작업 관리: ForkJoinPool은 작업 스케줄링, 워크 스틸링, 병렬 처리 등을 관리하는 메커니즘을 제공한다. 개발자가 세부적인 스레드 관리나 큐 처리 등을 직접 코딩하지 않아도 된다.
제어 역전 (IoC, Inversion of Control): 작업 실행과 스레드 관리는 ForkJoinPool이 수행하며, 개발자는 작업의 논리만 작성
작업 분할 및 병합 전략:RecursiveTask와 RecursiveAction이라는 추상 클래스를 기반으로 작업을 설계하며, 내부적으로는 효율적인 작업 분할 및 병합을 자동으로 처리한다.
워크 스틸링 (Work Stealing): 스레드 풀에서 작업 큐를 관리하며, 비활성 스레드가 다른 활성 스레드의 큐에서 작업을 가져와 실행하는 동적 작업 분배를 한다.(처리량을 최적화); 개발자가 구현할 필요 없이 forkjoinpool이 자동으로 처리
표준화된 인터페이스: 개발자가 사용할 수 있는 명확한 API (invoke, submit, fork, join 등)를 제공. 이로 인해 복잡한 병렬 프로그래밍을 간단히 구현할 수 있음.
장점
멀티코어를 활용하여 작업을 병렬로 처리하므로 CPU 사용률이 최적화
워크 스틸링을 통해 비효율적인 작업 분배를 방지
단점
작업 분할 및 병합에 대한 오버헤드가 존재
I/O 중심 작업에서는 비효율적이며, CPU 집약적인 작업에 적합
적합한 상황
데이터가 많고, 병렬로 처리할 수 있는 작업
재귀작업/반복적으로 작업을 나눌 수 있을 때 (예: 합계, 정렬)
CPU 집약적인 작업에서 최적의 성능을 얻고자 할 때
개발 시 전체적인 흐름
큰 작업이면 Fork하여 병렬 처리.
작은 작업이면 직접 계산으로 효율적 처리.
모든 계산이 끝나면 병렬 결과를 Join하여 최종 결과를 얻음.
작은 작업은 직접 계산하는 이유
작업 분할의 비용 문제:
Fork/Join Framework는 큰 작업을 작은 작업으로 나누고 각 작업을 병렬적으로 실행
하지만 작업을 너무 많이 나누면작업 분할과 작업 병합(merge)에 드는 오버헤드(비용)가 커질 수 있음
작은 작업에 대해서는 작업 분할을 하지 않고 직접 계산하여 오버헤드를 줄임
효율성 최적화:
일정 크기 이하의 작업은 더 이상 병렬로 처리할 필요가 없으므로 직접 계산이 더 효율적
예를 들어, 배열의 일부를 합산하거나 특정 범위의 숫자를 더하는 간단한 작업이라면, 병렬처리 대신 반복문을 통해 순차적으로 계산하는 것이 빠름
ForkJoinPool의 주요 메서드
invoke(ForkJoinTask<?> task): 기다리고 결과를 받음
execute(ForkJoinTask<?> task): 작업을 비동기로 실행
submit(ForkJoinTask<?> task): 작업을 실행하고 Future를 반환
ForkJoinPool 개발 시 RecursiveTask와 RecursiveAction의 역할
RecursiveTask<V>:
반환값이 있는 병렬 작업을 정의할 때 사용
작업을 분할하고 결과를 합산하여 반환(compute() 메서드)
RecursiveAction:
반환값이 없는 병렬 작업을 정의할 때 사용.
단순히 작업을 수행하고 결과를 반환하지 않는 경우 적합(compute() 메서드)
꼭 써야해?
RecursiveTask를 상속하지 않고도 직접ForkJoinTask또는Runnable과 같은 인터페이스를 사용할 수 있다. 하지만 이는 더 복잡하고 비효율적이며 코드 복잡성을 증가시킨다.
ForkJoinPool
스레드 갯수를 생략하면, 기본적으로가용한 CPU 코어 수에 따라 동작
스레드 수 = Runtime.getRuntime().availableProcessors() 즉, 현재 시스템의 CPU 코어 수(논리적 코어 포함)가 기본 스레드 수로 사용됨
ForkJoinPool pool = new ForkJoinPool(); //내부적으로 가용한 프로세서 수를 기반으로 스레드 풀 크기를 결정
ForkJoinPool pool = new ForkJoinPool(4); // 스레드 4개 사용
예시
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinExample {
static class SumTask extends RecursiveTask<Integer> {
private final int[] arr;
private final int start, end;
private static final int THRESHOLD = 10;
public SumTask(int[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 작은 작업은 직접 계산
int sum = 0;
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else {
// 작업 분할
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(arr, start, mid);
SumTask rightTask = new SumTask(arr, mid, end);
leftTask.fork(); // 병렬 처리
int rightResult = rightTask.compute(); // 동기 처리
int leftResult = leftTask.join(); // 병합
return leftResult + rightResult;
}
}
}
public static void main(String[] args) {
int[] arr = new int[100];
for (int i = 0; i < arr.length; i++) arr[i] = i + 1;
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(arr, 0, arr.length);
int result = pool.invoke(task);
System.out.println("Sum: " + result); // 출력: Sum: 5050
}
}
1. leftTask.fork();
작업 분할:
leftTask를 병렬로 처리할 수 있도록 Fork-Join Pool에 작업 큐로 제출
fork() 메서드는 현재 작업을 Fork-Join Pool의 스레드가 처리하도록 요청하며, 비동기적으로 실행
이 시점에서 leftTask는 아직 결과를 계산하지 않음
2. int rightResult = rightTask.compute();
동기 실행:
rightTask는 직접 현재 스레드에서 동기적으로 계산
compute() 메서드는 RecursiveTask에서 작업을 처리하는 메인 로직
이렇게 함으로써 하나의 스레드가 rightTask를 바로 계산하여, 자원을 최대한 활용
3. int leftResult = leftTask.join();
결과 병합:
join() 메서드는 leftTask가 완료될 때까지 대기하고 결과를 반환
만약 leftTask가 이미 완료되었으면, 바로 결과를 반환
이를 통해 leftTask와 rightTask의 결과를 병합
왜 이런 방식으로 처리?
자원의 효율적 활용:
leftTask는 병렬로 실행하도록 요청 (fork())
한편, rightTask는 현재 스레드에서 처리 (compute())
이렇게 하면 다른 작업 스레드가 leftTask를 처리하는 동안, 현재 스레드가 놀지 않고 rightTask를 계산하여 자원을 최대한 활용
병렬성과 동기화의 조합:
fork()로 비동기 작업을 시작하고 join()으로 결과를 기다리며 동기화.
병렬성과 동기화의 균형을 유지하면서 성능을 최적화
ForkJoinPool
특징:
Java 7에서 도입.
작업 분할(divide-and-conquer)을 기반으로 병렬 처리를 수행.
Work-Stealing 알고리즘을 사용해 작업이 끝난 스레드가 다른 스레드의 작업을 훔쳐 효율성을 높임.
주로 재귀적인 작업 처리나 작업 분할에 사용.
RecursiveTask(결과 반환)와 RecursiveAction(결과 없음)을 통해 작업 정의.