반응형

https://docs.spring.io/spring-data/jpa/docs/current/reference/html/#repositories.query-streaming

 

Spring Data JPA - Reference Documentation

Example 109. Using @Transactional at query methods @Transactional(readOnly = true) interface UserRepository extends JpaRepository { List findByLastname(String lastname); @Modifying @Transactional @Query("delete from User u where u.active = false") void del

docs.spring.io

몇몇 spring-jpa 버전과 java8 이상을 동시에 사용하면 return type으로 Stream<T>을 사용할 수 있다.

 

기존의 Collection interface를 사용할 때와는 어떤 차이점이 있을까.

1. List

  • List는 데이터베이스에서 조회된 결과를 한 번에 메모리(힙)에 모두 로드하여 반환한다. (eagerly loading). 
  • JPA에서 List<T>는 결과를 모두 즉시 로딩하고, 그 결과를 메모리에서 다룬다.

장점:

  • 단순성: 모든 데이터를 한꺼번에 메모리에 로드하므로 사용하기 쉽다.
  • 즉시 사용 가능: 결과 데이터를 바로 사용하거나 순회할 수 있다.

단점:

  • 메모리 사용량: 결과가 크면 모든 데이터를 한꺼번에 메모리에 로드하기 때문에 메모리 사용량이 많을 수 있음.(OOM주의)
  • 성능: 대용량 데이터를 처리할 때 성능 저하가 발생할 수 있다.
    • 이 모든 절차가 끝나야지만 client response를 받을 수 있다. 그리고 DB 연결도 바로 끊어진다. DB 연결(persistence)의 유지를 원할 경우, @Transactional을 줘야 한다.

Stream

  • Stream은 데이터베이스에서 조회된 결과를 **지연 로딩(lazy loading)**으로 처리할 수 있다.
  • JPA에서 Stream<T>는 결과를 스트리밍 방식으로 처리하여, 데이터가 필요할 때마다 하나씩 처리된다.
  •  background batch job, webflux(reactive programming) 등에 사용 가능하다. 

장점:

  • 메모리 효율성: 결과를 한꺼번에 메모리에 올리지 않고 필요한 만큼 순차적으로 처리하므로, 대용량 데이터를 처리할 때 유리하다. (lazy loading)
  • 지연 처리: 스트림은 지연된 방식으로 데이터를 처리하므로, 일부 결과만 필요한 경우 효율적이다.

단점:

  • 리소스 관리: Stream은 데이터베이스 연결을 유지한 상태로 동작하므로, 사용 후 반드시 닫아줘야 한다(.close()).
  • 즉시 사용 불가: List와 달리 스트림은 즉시 모든 데이터를 사용할 수 없고, 순차적으로 처리해야 함
  • stream을 사용할 경우 @Transactional이 필수다.
///////////////// repository
@Query(value = "select " +
        "JSON_EXTRACT(data, '$.strClazz') as clazz, "+
        "JSON_EXTRACT(data, '$.totalScore') as totalScore "+
        "from table_event e " +
        "where e.gid = :#{#req.gid} " +
        "and e.event_id = :#{#req.eventId} " +
        "order by e.base_date desc ",
        nativeQuery = true)
Stream<DoubleWheelUserRes> getDoubleWheelUserInfosOrderByBaseDate(CoolTimeEventReq req, Pageable pageable);

default DoubleWheelUserRes getTop1DoubleWheelUserInfo(CoolTimeEventReq req){
    try(Stream<DoubleWheelUserRes> res = getDoubleWheelUserInfosOrderByBaseDate(req, PageRequest.of(0, 1))){
        return res.findFirst().orElse(new DoubleWheelUserRes() {
            @Override
            public String getClazz() {
                return null;
            }

            @Override
            public String getTotalScore() {
                return null;
            }
        });
    }
}
    
///////////////// service
@Transactional(value = "aTransactionManager", readOnly = true)
public DoubleWheelUserRes getDoubleWheelUserInfo(CoolTimeEventReq req){
    return userEventRepository.getTop1DoubleWheelUserInfo(req);
}

Java의 Stream은 AutoCloseable을 구현하고 있습니다. 따라서 try-with-resources 문을 사용할 수 있다.

구체적으로 말하면, Java 8에서 도입된 Stream 인터페이스는 BaseStream 인터페이스를 상속하고 있는데, 이 BaseStream이 AutoCloseable 인터페이스를 구현하고 있다. 그래서 Stream을 try-with-resources 문에서 사용하면 자동으로 close() 메서드가 호출되어 리소스를 해제한다.

try (Stream<User> userStream = userRepository.streamAll()) { ///stream close
    userStream.forEach(user -> {
        // 사용자 처리
    });
}

///
@Query("SELECT u FROM User u")
Stream<User> streamAll();

 

Stream과 트랜잭션의 관계

  1. Stream은 지연 로딩을 기반으로 동작: 커서방식
    • JPA에서 **Stream<T>**는 지연 로딩(lazy loading) 방식으로 데이터를 가져온다. 즉, 데이터를 한 번에 모두 로드하는 대신, 필요할 때마다 데이터를 순차적으로 가져오는 방식이다.
    • 이 과정에서 데이터베이스 연결이 열려 있어야 스트리밍을 통해 데이터베이스에서 데이터를 지속적으로 읽어올 수 있다.
  2. @Transactional은 트랜잭션을 유지:
    • 트랜잭션이 활성화되면, JPA는 **데이터베이스 연결(session)**을 트랜잭션이 끝날 때까지 유지한다.
    • **Stream**은 트랜잭션이 끝날 때까지 데이터를 계속해서 가져와야 하므로, 트랜잭션이 열려 있는 상태에서 스트리밍 작업을 수행해야 한다.
    • 만약 트랜잭션이 열려 있지 않으면, 데이터베이스 연결이 닫히고, LazyInitializationException 같은 오류가 발생할 수 있다. 이는 트랜잭션 외부에서 지연 로딩을 시도했을 때 발생하는 오류이다.
    • 트랜잭션 타임아웃: 트랜잭션이 설정된 시간 내에 완료되지 않으면 타임아웃이 발생해 트랜잭션이 롤백됩니다.
    • 데이터베이스 커넥션 유지 문제: 커서 방식으로 데이터베이스 커넥션을 오랜 시간 열어둬야 하기 때문에, 데이터베이스 연결 자원이 부족해질 수 있습니다.
    • 락 경합: 트랜잭션이 너무 오래 유지되면, 다른 트랜잭션들이 동일한 데이터를 수정하려고 할 때 **락 경합(lock contention)**이 발생할 수 있습니다.
  3. @Transactional 없이 사용할 경우:
    • 트랜잭션이 열리지 않으면, 데이터베이스 세션이 닫히기 때문에 스트리밍이 중간에 끊기고 예외가 발생할 가능성이 높다.
    • 따라서 Stream을 사용해 데이터를 처리할 때는 트랜잭션이 유지되어야만 안전하게 데이터베이스와의 연결을 유지하면서 지연 로딩을 통한 스트리밍이 가능다.
@Service
public class UserService {

    @Autowired
    private UserRepository userRepository;

    @Transactional(readonly=true, timeout = 300) // 트랜잭션 타임아웃을 300초로 설정
    public void processUsers() {
        try (Stream<User> userStream = userRepository.findAllUsersByStream()) {
            userStream.forEach(user -> {
                // 유저 처리 로직
                System.out.println(user.getName());
            });
        }
    }
}
728x90
반응형

+ Recent posts