728x90
반응형
728x90
반응형
반응형

1. transaction commit 순서

spring jpa에서 기본으로 제공하는 repository.delete() / repository.save()를 사용할 때에

https://docs.jboss.org/hibernate/orm/4.2/javadocs/org/hibernate/event/internal/AbstractFlushingEventListener.html

 

AbstractFlushingEventListener (Hibernate JavaDocs)

protected  void performExecutions(EventSource session)           Execute all SQL (and second-level cache updates) in a special order so that foreign-key constraints cannot be violated: Inserts, in the order they were performed Updates Deletion

docs.jboss.org

한 트랜젝션 안에서 delete -> insert 순으로 작업이 있어도 fk제약조건 때문에 insert -> delete 순으로 진행된다.

외래 키 제약 조건이란?
외래 키 값은 다른 테이블의 기본키 값들 중에 하나여야 함.
즉, 없는 데이터를 참조해서 외래 키로 쓰면 안 됨.

이 문제는 특히 엔티티 간의 연관 관계가 설정되어 있고, 외래 키(Foreign Key) 제약 조건이 걸려 있는 경우 발생할 수 있습니다.

 

단이는 spring jpa에서 기본으로 제공하는 repository.delete() / repository.save()를 사용할 때에 그렇고 @Query 어노테이션을 이용하여 커스텀 쿼리를 줄 경우는 또 안 그렇다..

해결법

1. 수동 플러시 사용

Hibernate가 SQL을 자동으로 처리하지 않도록 수동으로 플러시(flush())와 클리어(clear())를 사용하여 정확한 순서로 쿼리를 강제할 수 있습니다.

@Transactional
public void deleteAndInsert() {
    // 기존 엔티티 삭제
    myEntityRepository.deleteAll();

    // 엔티티 매니저 수동 플러시
    entityManager.flush(); 
    entityManager.clear(); 

    // 새로운 엔티티 삽입
    List<MyEntity> newEntities = new ArrayList<>();
    // newEntities에 데이터 추가
    myEntityRepository.saveAll(newEntities);
}
  • deleteAll() 후에 entityManager.flush()를 호출하면, DELETE SQL이 즉시 실행되어 데이터베이스에서 삭제됩니다.
  • entityManager.clear()는 Persistence Context를 비워 엔티티 상태를 초기화합니다.

 

2. Cascade 설정 확인

엔티티 간 연관 관계에서 cascade 설정이 문제를 일으킬 수 있습니다. 예를 들어, CascadeType.ALL 또는 CascadeType.REMOVE가 설정되어 있는 경우 Hibernate가 엔티티의 라이프사이클을 관리하기 위해 예상치 못한 순서로 INSERT 또는 DELETE를 수행할 수 있습니다.

연관된 엔티티의 cascade 옵션을 다음과 같이 필요한 옵션만 사용하여 설정합니다.

@OneToMany(mappedBy = "parent", cascade = {CascadeType.PERSIST, CascadeType.MERGE})
private List<Child> children;

3. JPQL 또는 Native Query 사용

JPA가 아닌 순수 SQL 쿼리를 사용하여 필요한 순서로 작업을 수행할 수도 있습니다.

@Modifying
@Query("DELETE FROM MyEntity e WHERE e.parent.id = :parentId")
void deleteByParentId(@Param("parentId") Long parentId);

@Modifying
@Query(value = "INSERT INTO my_entity (id, name, parent_id) VALUES (:id, :name, :parentId)", nativeQuery = true)
void insertEntity(@Param("id") Long id, @Param("name") String name, @Param("parentId") Long parentId);

이 방법을 사용하면, 외래 키 제약 조건을 위반하지 않도록 원하는 순서로 DELETE와 INSERT를 수행할 수 있습니다.

4. 트랜잭션 분리

DELETE와 INSERT를 별도의 트랜잭션으로 처리하는 방법도 있습니다. 이렇게 하면 한 트랜잭션에서 삭제된 후, 다른 트랜잭션에서 삽입이 되므로 외래 키 문제를 피할 수 있습니다.

 

2. Transactional AOP

같은 클래스 내에서 두 메서드에 각각 @Transactional이 있고, 한 메서드가 다른 메서드를 호출할 때(transactional 중첩), REQUIRES_NEW를 propagation을 사용해서 호출하더라도 해당 트랜젝션은 첫 번째 트랜젝션에 다 물린다.

다른 트랜젝션으로 처리해야 한다면 다른 클래스에 @Transactional이 있는 메서드를 호출해야 한다.

이유는 spring AOP는 프락시는 객체 단위로 감싸 지기 때문에 같은 클래스 안의 다른 함수도 결국 같은 프락시에 있어서 그렇다.

참고) 지금 로직이 transaction안에 들어있는지 확인

transactionstatus를 조회해서 같은 id 인지, active 인지 확인 가능하다.

You can check if transaction is active using 
TransactionSynchronizationManager.isActualTransactionActive().
But you should call it before a service method executing.

Also you can get status of current transaction using
TransactionStatus status = TransactionAspectSupport.currentTransactionStatus();

Besides, maybe a good point for you is to enable logging of transactions.
log4j.logger.org.hibernate.transaction=DEBUG
log4j.logger.org.springframework.transaction=DEBUG

2024.05.23 - [개발/spring] - [jpa] transaction propagation

 

[jpa] transaction propagation

2024.05.23 - [개발/spring] - [transaction] isolation level [jpa] transaction isolation level2024.05.22 - [개발/sql] - DB isolation level DB isolation levelisolation level 이란 무엇인가?디비 동시성을 관리하고 데이터 정합성을 유

bangpurin.tistory.com

 

3. save vs saveAll

100개의 entity를 저장하고자 할 때, save를 이용할 때와 saveAll을 이용할 때 어떤 것이 성능이 더 좋을까? 답은 saveAll이다.

@Transactional
@Override
public <S extends T> S save(S entity) {

    Assert.notNull(entity, "Entity must not be null.");

    if (entityInformation.isNew(entity)) {
        em.persist(entity);
        return entity;
    } else {
        return em.merge(entity);
    }
}

/*
 * (non-Javadoc)
 * @see org.springframework.data.jpa.repository.JpaRepository#save(java.lang.Iterable)
 */
@Transactional
@Override
public <S extends T> List<S> saveAll(Iterable<S> entities) {

    Assert.notNull(entities, "Entities must not be null!");

    List<S> result = new ArrayList<S>();

    for (S entity : entities) {
        result.add(save(entity));
    }

    return result;
}

내부 로직을 살펴보면 saveAll도 결국 for loop을 돌면서 save를 호출하는 것이니 같은 로직이라고 생각할 수 있다.

하지만 transaction중심으로 보자면 달라진다. saveAll과 save 모두 @Transactional 어노테이션이 있는 것을 볼 수 있는데, 이 경우 밖에 있는 트랜젝션으로 유지된다(추가적인 트랜젝션이 생성되지 않는다). 위에서 설명한 spring AOP 원리에 의거한다. 

REQUIRED is the default propagation. Spring checks if there is an active transaction, and if nothing exists, it creates a new one. Otherwise, the business logic appends to the currently active transaction

즉 save 100번은 100번의 트랜젝션이 생겼다 없어졌다 하는 것이고, saveAll은 하나의 트랜젝션에서 100번 저장하는 것이다. 여기서 속도 차이가 나오게 된다.

 

그렇다면, 서비스단에서 @Transactional을 걸고 save 100번과, @Transactional 없이 saveAll 100번 한 성능은 비슷할까?

이론상으로는 비슷해야 하는데 테스트해봐야겠다.

++ 여기서 mysql 디비를 사용하면 사실 saveAll이 진짜 배치 인서트가 아닐 수 있다. 특히 id를 자동생성하면 더더욱!

GenerationType.IDENTITY Forces Synchronous Insert

id를 생성해서 다시 어플리케이션으로 내려주고 스프링은 그걸 받아서 세팅한 후 save all을 하는 것이라.. 디비 호출이 딱히 줄지 않아 성능상 이점이 없다. 이를 해결하기 위해서는 Sequence 방식으로 id 생성 전략을 바꿔야한다..

 


관련 글:

2022.03.30 - [개발/spring] - [transaction] rollback works only for unchecked exception

 

[transaction] rollback works only for unchecked exception

2022.03.16 - [개발/spring] - [spring] ChainedTransactionManager deprecated from boot 2.5 [spring] ChainedTransactionManager deprecated from boot 2.5 한 트랜젝션에서 서로 다른 DB에 update/insert 작업..

bangpurin.tistory.com

2022.05.27 - [개발/spring] - [spring-jpa] 부모-자식 트랜젝션 관계(propagation)

 

[spring-jpa] 부모-자식 트랜젝션 관계(propagation)

@Transactional 하위에 또 다른 @Transactional을 준다면? REQUIRED is the default propagation. Spring checks if there is an active transaction, and if nothing exists, it creates a new one. Otherwise,..

bangpurin.tistory.com

 

2022.01.27 - [개발/spring] - [jpa] 영속성 컨텍스트 in spring

 

[jpa] 영속성 컨텍스트 in spring

영속성 컨텍스트? 엔티티를 영구 저장하는 환경으로 애플리케이션과 DB 사이에서 객체를 보관하는 가상의 데이터베이스 같은 역할을 한다. 엔티티 매니저를 통해 엔티티를 저장하거나 조회하

bangpurin.tistory.com

2022.01.27 - [개발/spring] - [jpa] OSIV란; spring.jpa.open-in-view

 

[jpa] OSIV란; spring.jpa.open-in-view

OSIV에 대한 이해를 하려면 영속성 컨텍스트가 뭔지 알아야한다. 이전 글을 참고하자. 2022.01.27 - [개발/spring] - [jpa] 영속성 컨텍스트 in spring [jpa] 영속성 컨텍스트 in spring 영속성 컨텍스트? 엔티티.

bangpurin.tistory.com

 

https://eocoding.tistory.com/94

 

@Transactional 분리가 안되는 이유 / 실험을 통해 트랜잭션 전파 유형과 Spring AOP 이해

이번 포스팅은 올해 5월에 올렸던 포스팅에 대한 후속 포스팅이다. https://eocoding.tistory.com/74 @Transactional에서 JPA로 delete한 뒤 insert가 안될 때, duplicate entry 에러가 날 때 해결하기 일단 원인..

eocoding.tistory.com

https://2dongdong.tistory.com/29

 

Spring Data JPA Save(insert) 속도 최적화

대량의 데이터를 삽입하는 상황이 생겼습니다. 초창기에는 JPA Save함수를 반복문을 통해 호출해서 저장하게 구현을 했는데요. 처음에는 괜찮았으나, 삽입 할 데이터가 점점 많아지면서 삽입 시

2dongdong.tistory.com

 

728x90
반응형
반응형

https://docs.spring.io/spring-data/jpa/docs/current/reference/html/#repositories.query-streaming

 

Spring Data JPA - Reference Documentation

Example 109. Using @Transactional at query methods @Transactional(readOnly = true) interface UserRepository extends JpaRepository { List findByLastname(String lastname); @Modifying @Transactional @Query("delete from User u where u.active = false") void del

docs.spring.io

몇몇 spring-jpa 버전과 java8 이상을 동시에 사용하면 return type으로 Stream<T>을 사용할 수 있다.

 

기존의 Collection interface를 사용할 때와는 어떤 차이점이 있을까.

1. List

  • List는 데이터베이스에서 조회된 결과를 한 번에 메모리(힙)에 모두 로드하여 반환한다. (eagerly loading). 
  • JPA에서 List<T>는 결과를 모두 즉시 로딩하고, 그 결과를 메모리에서 다룬다.

장점:

  • 단순성: 모든 데이터를 한꺼번에 메모리에 로드하므로 사용하기 쉽다.
  • 즉시 사용 가능: 결과 데이터를 바로 사용하거나 순회할 수 있다.

단점:

  • 메모리 사용량: 결과가 크면 모든 데이터를 한꺼번에 메모리에 로드하기 때문에 메모리 사용량이 많을 수 있음.(OOM주의)
  • 성능: 대용량 데이터를 처리할 때 성능 저하가 발생할 수 있다.
    • 이 모든 절차가 끝나야지만 client response를 받을 수 있다. 그리고 DB 연결도 바로 끊어진다. DB 연결(persistence)의 유지를 원할 경우, @Transactional을 줘야 한다.

Stream

  • Stream은 데이터베이스에서 조회된 결과를 **지연 로딩(lazy loading)**으로 처리할 수 있다.
  • JPA에서 Stream<T>는 결과를 스트리밍 방식으로 처리하여, 데이터가 필요할 때마다 하나씩 처리된다.
  •  background batch job, webflux(reactive programming) 등에 사용 가능하다. 

장점:

  • 메모리 효율성: 결과를 한꺼번에 메모리에 올리지 않고 필요한 만큼 순차적으로 처리하므로, 대용량 데이터를 처리할 때 유리하다. (lazy loading)
  • 지연 처리: 스트림은 지연된 방식으로 데이터를 처리하므로, 일부 결과만 필요한 경우 효율적이다.

단점:

  • 리소스 관리: Stream은 데이터베이스 연결을 유지한 상태로 동작하므로, 사용 후 반드시 닫아줘야 한다(.close()).
  • 즉시 사용 불가: List와 달리 스트림은 즉시 모든 데이터를 사용할 수 없고, 순차적으로 처리해야 함
  • stream을 사용할 경우 @Transactional이 필수다.
///////////////// repository
@Query(value = "select " +
        "JSON_EXTRACT(data, '$.strClazz') as clazz, "+
        "JSON_EXTRACT(data, '$.totalScore') as totalScore "+
        "from table_event e " +
        "where e.gid = :#{#req.gid} " +
        "and e.event_id = :#{#req.eventId} " +
        "order by e.base_date desc ",
        nativeQuery = true)
Stream<DoubleWheelUserRes> getDoubleWheelUserInfosOrderByBaseDate(CoolTimeEventReq req, Pageable pageable);

default DoubleWheelUserRes getTop1DoubleWheelUserInfo(CoolTimeEventReq req){
    try(Stream<DoubleWheelUserRes> res = getDoubleWheelUserInfosOrderByBaseDate(req, PageRequest.of(0, 1))){
        return res.findFirst().orElse(new DoubleWheelUserRes() {
            @Override
            public String getClazz() {
                return null;
            }

            @Override
            public String getTotalScore() {
                return null;
            }
        });
    }
}
    
///////////////// service
@Transactional(value = "aTransactionManager", readOnly = true)
public DoubleWheelUserRes getDoubleWheelUserInfo(CoolTimeEventReq req){
    return userEventRepository.getTop1DoubleWheelUserInfo(req);
}

Java의 Stream은 AutoCloseable을 구현하고 있습니다. 따라서 try-with-resources 문을 사용할 수 있다.

구체적으로 말하면, Java 8에서 도입된 Stream 인터페이스는 BaseStream 인터페이스를 상속하고 있는데, 이 BaseStream이 AutoCloseable 인터페이스를 구현하고 있다. 그래서 Stream을 try-with-resources 문에서 사용하면 자동으로 close() 메서드가 호출되어 리소스를 해제한다.

try (Stream<User> userStream = userRepository.streamAll()) { ///stream close
    userStream.forEach(user -> {
        // 사용자 처리
    });
}

///
@Query("SELECT u FROM User u")
Stream<User> streamAll();

 

Stream과 트랜잭션의 관계

  1. Stream은 지연 로딩을 기반으로 동작: 커서방식
    • JPA에서 **Stream<T>**는 지연 로딩(lazy loading) 방식으로 데이터를 가져온다. 즉, 데이터를 한 번에 모두 로드하는 대신, 필요할 때마다 데이터를 순차적으로 가져오는 방식이다.
    • 이 과정에서 데이터베이스 연결이 열려 있어야 스트리밍을 통해 데이터베이스에서 데이터를 지속적으로 읽어올 수 있다.
  2. @Transactional은 트랜잭션을 유지:
    • 트랜잭션이 활성화되면, JPA는 **데이터베이스 연결(session)**을 트랜잭션이 끝날 때까지 유지한다.
    • **Stream**은 트랜잭션이 끝날 때까지 데이터를 계속해서 가져와야 하므로, 트랜잭션이 열려 있는 상태에서 스트리밍 작업을 수행해야 한다.
    • 만약 트랜잭션이 열려 있지 않으면, 데이터베이스 연결이 닫히고, LazyInitializationException 같은 오류가 발생할 수 있다. 이는 트랜잭션 외부에서 지연 로딩을 시도했을 때 발생하는 오류이다.
    • 트랜잭션 타임아웃: 트랜잭션이 설정된 시간 내에 완료되지 않으면 타임아웃이 발생해 트랜잭션이 롤백됩니다.
    • 데이터베이스 커넥션 유지 문제: 커서 방식으로 데이터베이스 커넥션을 오랜 시간 열어둬야 하기 때문에, 데이터베이스 연결 자원이 부족해질 수 있습니다.
    • 락 경합: 트랜잭션이 너무 오래 유지되면, 다른 트랜잭션들이 동일한 데이터를 수정하려고 할 때 **락 경합(lock contention)**이 발생할 수 있습니다.
  3. @Transactional 없이 사용할 경우:
    • 트랜잭션이 열리지 않으면, 데이터베이스 세션이 닫히기 때문에 스트리밍이 중간에 끊기고 예외가 발생할 가능성이 높다.
    • 따라서 Stream을 사용해 데이터를 처리할 때는 트랜잭션이 유지되어야만 안전하게 데이터베이스와의 연결을 유지하면서 지연 로딩을 통한 스트리밍이 가능다.
@Service
public class UserService {

    @Autowired
    private UserRepository userRepository;

    @Transactional(readonly=true, timeout = 300) // 트랜잭션 타임아웃을 300초로 설정
    public void processUsers() {
        try (Stream<User> userStream = userRepository.findAllUsersByStream()) {
            userStream.forEach(user -> {
                // 유저 처리 로직
                System.out.println(user.getName());
            });
        }
    }
}
728x90
반응형
반응형
@Transactional
public void test(){
    //entity는 대충 만들어졌다고 침
    entity1 = entityRepository.save(new Entity1()); 
    //여기서 save; seq는 mysql-autoincrement
    log.info(entity1.seq);
}


//////////////////////////
@Entity
@Table(name = "hd_test")
//@IdClass(Entity1.Pk.class)
public class Entity1 {
    @Id
    @GeneratedValue(strategy=GenerationType.IDENTITY)
    private BigInteger seq;
//    @Id
    private LocalDateTime logDate;
	...
}

entity1.seq가 null이 아닌 경우

  1. @Transactional이 있을 때 / 없을 때 (상관없음)
  2. entity의 @Id가 seq 한 개 이고 @GeneratedValue = Identity일 경우
    • composite key이거나 @GeneratedValue가 없으면 에러 남

참고로 @GeneratedValue는 primary key 일 때만 작동

 

참고:

https://dogcowking.tistory.com/61

 

composite id 사용하는 개체 save 후 ID 받기

* 개요 Hibernate 에서 composite key (복합키) 로 선언했을 때 하나의 컬럼은 자동 생성이 가능한가? ex : INSERT 그룹명은 지정해주고, 그룹내 번호는 DBMS 에 의해 자동 생성되는 경우 * 일반적인 composite id

dogcowking.tistory.com

 

728x90
반응형
반응형

powermock이란? junit4로 테스트코드를 작성 시 static method를 mock할 수 있게 하는 방법

Mockito Mock Static Method using PowerMock
PowerMock provides different modules to extend Mockito framework and run JUnit and TestNG test cases.

Note that PowerMock doesn’t support JUnit 5 yet, so we will create JUnit 4 test cases. We will also learn how to integrate TestNG with Mockito and PowerMock.

Junit5는 지원하지 않나보다.

 

개발 환경: ivy(ant), java6

아래와 같이 관련 lib를 선언해준다.

<dependency org="org.powermock" name="powermock-core" rev="1.6.3"/>
<dependency org="org.powermock" name="powermock-module-junit4" rev="1.6.3"/>
<dependency org="org.powermock" name="powermock-api-mockito" rev="1.6.3"/>
<dependency org="org.powermock" name="powermock-api-easymock" rev="1.6.3"/>
<dependency org="org.javassist" name="javassist" rev="3.19.0-GA" conf="runtime" force="true" transitive="false"/>
<dependency org="org.mockito" name="mockito-core" rev="1.10.19"/>

 

목표: 아래의 static method를 테스트 코드 내에서 mock한다.

public static DataHandler getDataHandler() throws IllegalStateException {
   ...
}
We need to do the following to integrate PowerMock with Mockito and JUnit 4.

1. Annotate test class with @RunWith(PowerMockRunner.class) annotation.
2. Annotate test class with @PrepareForTest and provide classed to be mocked using PowerMock.
3. Use PowerMockito.mockStatic() for mocking class with static methods.
4. Use PowerMockito.verifyStatic() for verifying mocked methods using Mockito.

 

powermockito가 기존의 junit4와 혼용이 가능한지 궁금했는데, junit4코드를 아래와 같이 수정하니 잘 되는 것을 확인할 수 있었다.

@RunWith(PowerMockRunner.class)  //1.
@PrepareForTest(DataHandlerFactory.class) //2.
//@RunWith(MockitoJUnitRunner.class)
public class HuntBOTest {

//기존 junit4 스타일로 선언
    @InjectMocks
    private HuntBO huntBO;
    @Mock
    private HuntDAO huntDAO;

    public String ALPHA_FILE = "data-alpha.xml";

    public void includeForAlpha(){
    //mockStatic 이후에 하면 null이 나오기 때문에 mock하기 전에 stub을 만들어야 한다.
        DataHandler sample = DataHandlerFactory.create(ALPHA_FILE);

        PowerMockito.mockStatic(DataHandlerFactory.class); //3.
        when(DataHandlerFactory.getDataHandler()).thenReturn(sample);
    }

    @Test
    public void getTop1000__subList__alpha(){
        includeForAlpha();
        when(huntDAO.selectHuntTop1000AfterAug()).thenReturn(getList());
        List<HuntRank> hrList = huntBO.getTop1000();
        List<HuntRank> hrListSub = hrList.subList(0, 1);
        assertThat(hrListSub.get(0).getRank(), is(1));
    }
    
}
728x90
반응형

'개발 > java' 카테고리의 다른 글

[java] for loops and performance  (0) 2022.08.04
[jmh] benchmark test  (0) 2022.08.04
[java] jvm, java, gc  (0) 2022.02.24
[keyword] transient in serialization  (0) 2022.02.16
[junit5] no test found / checked exception is invalid  (0) 2022.02.07
반응형
webjar?
클라이언트에서 사용하는 웹라이브러리(jquery, bootstrap, React.js, Vue.js, Angular 등) 를 JAR 파일 안에 패키징한 것이다.

jquery, bootstrap 등과 같은 static library를 사용하는 경우 보통 개별적으로 버전을 알아봐서 resources 폴더 안에 다운로드하여 넣거나 cdn 주소를 명시하거나 하는 방법으로 사용한다. 이 경우 필요한 라이브러리를 일일이 찾아서 추가해줘야 하고 버전 관리를 따로 해줘야 하는 번거로움이 있는데, webjar을 이용하면 버전관리가 좀 더 수월하고 springboot의 여러 지원도 받을 수 있다.

maven central repo에는 org.webjar.*로 javascript library가 등록되어 있다. 하지만 실제로 찾아보니 maven repo에서 찾는 것 보다 webjars 공식 사이트에서 하는 게 편하다.
www.webjars.org/

WebJars - Web Libraries in Jars

www.webjars.org


pom.xml이나 build.gradle에 아래와 같이 필요한 라이브러리 dependency를 추가한다.

    <dependency>
        <groupId>org.webjars</groupId>
        <artifactId>webjars-locator-core</artifactId>
    </dependency>
    <dependency>
        <groupId>org.webjars</groupId>
        <artifactId>sockjs-client</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.webjars</groupId>
        <artifactId>stomp-websocket</artifactId>
        <version>2.3.4</version>
    </dependency>
    <dependency>
        <groupId>org.webjars</groupId>
        <artifactId>jquery</artifactId>
        <version>3.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.webjars</groupId>
        <artifactId>bootstrap</artifactId>
        <version>5.1.3</version>
    </dependency>


특히 webjars-locator-core는 html include 시 아래와 같이 버전을 명시하지 않아도 자동으로 가져오기 때문에 사용을 추천한다.

///before; version 정보 있음
<script src="/webjars/jquery/3.5.1/dist/jquery.min.js"></script>

///after; version 정보 없음
<script src="/webjars/jquery/jquery.slim.js"></script>
<script src="/webjars/sockjs-client/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>


이를 사용하지 않으면 만약 버전을 하나 올릴 경우 모든 include 안의 버전을 수정해줘야 하는 번거로움이 있는데, webjars-locator-core를 사용하면 pom.xml의 명시된 버전으로 자동 사용한다.

728x90
반응형
반응형

2022.07.07 - [개발/spring] - [liquibase] springboot과 연동가능 한 db형상관리 툴

 

[liquibase] springboot과 연동가능 한 db형상관리 툴

liquibase란? Liquibase is a database schema change management solution that enables you to revise and release database changes faster and safer from development to production. DB변경 정보를 관리하..

bangpurin.tistory.com

 

이전에 liquibase에 대해 알아본 적이 있다. 그 외로 알고 있던 flyway와의 차이점이 궁금하여 비교해본다.

 

공통점

  • DB 스키마 변화를 관리하는 도구
  • 오픈소스 기반
  • 자바와 연동 가능; springboot, vert.x
  • maven, gradle과 연동 가능
  • cli와 독립적으로 작동 가능
  • 여러 DB 지원

 

차이점

  flyway liquibase
변화 정의 sql 작성 xml, yaml, json 으로 정의
변화 파일 규칙 prefix: V (for versioned), U (for undo), and R (for repeatable)
separator: __(two underscores)
suffix: .sql
V01__Add_New_Column.sql
이름 규칙 없음
로깅 테이블 flyway_schema_history databasechangelog
수정 순서 관리 변화 파일 명의 버전 순서대로 master_changelog 파일에 순서 작성
롤백 (유료버전) 취소해야하는 부분의 파일 이후에 변화 파일의 이름을 U로 해서 실행 (유료버전) 전체 롤백/ 버전별 취소(undo)가능
수정 부분반영(환경별) 각 환경별 config 파일을 다르게 세팅해서 context, label 이라는 기능을 통해서 일부만 반영가능(마치 태그를 걸러 그 태그만 모아서 적용)
자바 기반의 migration지원 지원, 자바 파일 안에서의 migration을 정의가능 지원 안 함
스냅샷 지원 안 함 지원
pre-condition(조건부 적용) 지원 안 함(procedure를 이용해 적용) 지원

 

when to use?

Flyway

  • java코드나 sql문을 통해 변화 적용, DB에 직접적인/full control 필요할 경우(ex. oracle, postgresql)

 

Liquibase

  • xml, yaml, json 사용 가능 -> 다양한 환경, 다양한 DB언어에 대응 가능
728x90
반응형

'개발 > sql' 카테고리의 다른 글

[mysql] 유저의 등수 구하기 rank under v8  (0) 2024.02.06
[DB] 분산환경에서 데이터 저장소 선택과 활용  (0) 2023.07.24
[mysql] jsonpath  (0) 2022.05.27
[sql] case vs if  (0) 2022.05.02
[sql] aggregate / window function(mysql)  (0) 2022.05.02
반응형

liquibase란?

Liquibase is a database schema change management solution that enables you to revise and release database changes faster and safer from development to production.

DB변경 정보를 관리하는 툴이다. 다양한 DB를 지원하며 cli로도 적용 가능하며, springboot project 와도 연동이 가능하다.

주의할 사항은 liquibase를 이용하여 DB정보를 관리하기로 했다면 수동 혹은 기타 방법으로 DB 작업을 해서는 안된다. 히스토리 관리가 안될 테니 어디서 꼬일지 모른다.

공식 홈페이지: https://docs.liquibase.com/install/tutorials/h2.html

 

Using H2 Databases on Windows | Liquibase Docs

-- liquibase formatted sql -- changeset liquibase:1 CREATE TABLE test_table (test_id INT, test_column VARCHAR, PRIMARY KEY (test_id)) Tip: Formatted SQL changelogs generated from Liquibase versions before 4.2 might cause issues because of the lack of space

docs.liquibase.com


https://otrodevym.tistory.com/entry/DB-%EB%B2%84%EC%A0%84%EB%A5%BC-%EA%B4%80%EB%A6%AC-%EC%BD%94%EB%93%9C%EB%A1%9C-%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95-LIQUIBASE

 

DB 버전를 관리 코드로 하는 방법 - LIQUIBASE

소스 형상 관리는 git이나 svn으로 하고 ppt나 보고서도 버전을 관리를 합니다. DB에 대한 형상관리를 하는 방법이 없는지 고민하다가 프로젝트를 하면서 접하게 된 LIQUIBASE입니다. 리퀴베이스는 DB

otrodevym.tistory.com

 

springboot 2.2 + jpa + liquibase

https://browndwarf.tistory.com/21

 

Spring Boot + JPA/Hibernate + Liquibase Project 구성 (feat. PostgreSQL)

나이가 들어가면서 기억력이 떨어지는 것이 느껴져서 Spring 프로젝트의 많은 Property들 중 자주 사용하는 것 위주로 내용을 한 번 정리하고자 포스팅을 쓴다 개발 환경 OpenJDK v1.8.0_212 Spring Boot 2.1.4

browndwarf.tistory.com

 

728x90
반응형
반응형

https://www.typescriptlang.org/docs/handbook/utility-types.html

 

Documentation - Utility Types

Types which are globally included in TypeScript

www.typescriptlang.org

위 원문을 번역한다.

 

Partial<Type>

: 특정 object의 일부분만 가질 때 사용한다. 항목을 지정하지 않고 그저 일부이면 가능.

interface Todo {
  title: string;
  description: string;
}
 
function updateTodo(todo: Todo, fieldsToUpdate: Partial<Todo>) {
  return { ...todo, ...fieldsToUpdate };
}
 
const todo1 = {
  title: "organize desk",
  description: "clear clutter",
};
 
const todo2 = updateTodo(todo1, {
  description: "throw out trash",
});


console.log(updateTodo(todo1, todo2));

===========

[LOG]: {
  "title": "organize desk",
  "description": "throw out trash"
}

위 소스에서 Partial<Todo>는 Todo 인터페이스가 가진 항목의 일부를 가진 object이다.

 

Required<Type>

: 특정 object의 모든 항목을 반드시 가져야 한다.

// @errors: 2741
interface Props {
  a?: number;
  b?: string;
}

const obj: Props = { a: 5 };

const obj2: Required<Props> = { a: 5 };

==================
Errors in code
Property 'b' is missing in type '{ a: number; }' but required in type 'Required<Props>'.

위 소스에서 Required<Props>로 선언된 obj2는 optional로 선언한 b항목이 없어서 에러가 난다.

 

Readonly<Type>

: 처음 선언된 값을 변경할 수 없다.

// @errors: 2540
interface Todo {
  title: string;
}

const todo: Readonly<Todo> = {
  title: "Delete inactive users",
};

todo.title = "Hello";

================
Errors in code
Cannot assign to 'title' because it is a read-only property.

위 소스에서 Readonly<Todo> todo로 선언되어 있기 때문에 todo안의 항목의 값을 바꾸려고 할 경우 에러가 난다.

이 타입은 Object.freeze 함수 내부에서 사용되고 있다.

function freeze<Type>(obj: Type): Readonly<Type>;

----------------

const obj = {
  prop: 42
};

Object.freeze(obj); //freeze시키면

obj.prop = 33; //값을 바꿔도
// Throws an error in strict mode

console.log(obj.prop); //안 바뀐다
// expected output: 42

 

Record<Keys, Type>

: 키가 Key이고 값이 Type인 객체 타입

interface CatInfo {
  age: number;
  breed: string;
}

type CatName = "miffy" | "boris" | "mordred";

const cats: Record<CatName, CatInfo> = {
  miffy: { age: 10, breed: "Persian" },
  boris: { age: 5, breed: "Maine Coon" },
  mordred: { age: 16, breed: "British Shorthair" },
};

console.log(cats.boris);

==============
[LOG]: {
  "age": 5,
  "breed": "Maine Coon"
}

참고로 Record는 인덱스 시그니처(Index Signature)는 대괄호로 객체를 접근하는 방식으로도 표현가능하다.

interface CatInfo {
  age: number;
  breed: string;
}

type CatName = "miffy" | "boris" | "mordred";

const cats: Record<CatName, CatInfo> = {
  miffy: { age: 10, breed: "Persian" },
  boris: { age: 5, breed: "Maine Coon" },
  mordred: { age: 16, breed: "British Shorthair" },
};

const catss: {[name: string] : CatInfo} = { // index signature
   boris: { age: 5, breed: "Maine Coon" },
}

console.log(cats.boris);
console.log(catss.boris);

다만 인덱스 시그니처는 문자열 리터럴을 Key로 사용하는 경우 오류가 발생한다. 위 catss를 `catss: {[name: CatName : CatInfo}` 로 선언하면 에러가 난다.

 

Pick<Type, Keys>

: 어떤 object의 항목 중 일부만 골라서 사용 가능하다. 비슷한 부분집합 형태의 interface를 또 만들 필요가 없어진다.

interface Todo {
  title: string;
  description: string;
  completed: boolean;
}

type TodoPreview = Pick<Todo, "title" | "completed">;

const todo: TodoPreview = {
  title: "Clean room",
  completed: false,
};

선택한 항목 외에 다른 것을 더 넣거나 덜 넣으면 에러가 난다.

 

Omit<Type, Keys>

: Pick과 반대 기능으로 특정 항목만 제하고 사용할 수 있다.

interface Todo {
  title: string;
  description: string;
  completed: boolean;
  createdAt: number;
}

type TodoPreview = Omit<Todo, "description">;

const todo: TodoPreview = {
  title: "Clean room",
  completed: false,
  createdAt: 1615544252770,
};

const todo2: TodoPreview = { // completed가 추가적으로 빠져서 에러가 난다.
  title: "Clean room",
  createdAt: 1615544252770,
};


type TodoInfo = Omit<Todo, "completed" | "createdAt">;

const todoInfo: TodoInfo = {
  title: "Pick up kids",
  description: "Kindergarten closes at 5pm",
};

제한 항목 이외의 것을 더 넣거나 덜 넣으면 에러가 난다.

 

Exclude<UnionType, ExcludedMembers>

: 앞의 것 빼기 뒤의 것을 타입으로 갖는다.

type T0 = Exclude<"a" | "b" | "c", "a">;

const test1: T0 = "a"; // 에러
const test2: T0 = "b";
const test3: T0 = "d"; // 애러

type T1 = Exclude<"a" | "b" | "c", "a" | "b">;

type T2 = Exclude<string | number | (() => void), Function>;

const test4: T2 = function(){ // 에러
    console.log("this")
}

const test6: T2 = function(a: string):string{ // 에러
    return "hello " + a;
};

type T3 = Exclude<string | number | Function, Function>;

const test5: T3 = function(){ // 에러
    console.log("this")
}

제한 타입 이외의 값을 가지면 에러가 난다.

 

Extract<Type, Union>

: 앞의 타입 중 뒤의 것만 추출한 것을 타입으로 가진다.

type T0 = Extract<"a" | "b" | "c", "a" | "f">;
const test:T0 = "f"; // 에러
const test2:T0 = "b"; // 에러
const test3:T0 = "a";

type T1 = Extract<string | number | (() => void) | ((args: string) => string), Function>;

const test4:T1 = function(a: string):string{
    return 'hello ' + a;
}

const test5:T1 = function():string{
    return 'hello';
}

const test6:T1 = function(a: string){ // 에러
    console.log("test " + a)
}

const test7:T1 = "guest" // 에러

위 소스에서 T1은 string, number, 인자 없는 함수, 인자 있고 return 있는 함수 중 함수의 형태만 타입으로 가질 수 있다.

test6은 return 이 없는 함수고 test7은 string이라 에러가 난다.

 

NonNullable<Type>

: Type에 들어오는 타입 중 null, undefined를 제한 타입만 가능하다

type T0 = NonNullable<string | number | undefined>;
// T0는 string, number만 가능

type T1 = NonNullable<string[] | null | undefined>;
// T1은 string[]만 가능

 

Parameters<Type>

: 함수의 파라미터를 타입으로 리턴한다.

declare function f1(arg: { a: number; b: string }): void;

type T0 = Parameters<() => string>;
//   T0 = []
type T1 = Parameters<(s: string) => void>;
//   T1 = [s:string]
type T2 = Parameters<<T>(arg: T) => T>;
//   T2 = [arg: unknown]
type T3 = Parameters<typeof f1>;
//   T3 = [arg: { a: number; b: string }]
type T4 = Parameters<any>;
//   T4 = unknown[]

// never는 any를 제외한 모든 타입의 원시타입이기때문에
// 함수타입 T에 never로 주어도 에러가 발생하지 않음
type T5 = Parameters<never>;
//   T5 = never

type T6 = Parameters<string>;
//    에러
type T7 = Parameters<Function>;
//    에러

위 소스에서 T6, T7은 '(...args: any) => any'의 형식이 아니라서 에러가 난다. 즉 Parameters가 인자로 받을 수 있는 것은 모든 파라미터를 인자로 받고 결괏값으로 모든 값을 리턴하는, 사실상 모든 함수가 된다.

 

ConstructorParameters<Type>

: 생성자를 갖는 함수 타입의 생성자 파라미터를 리턴한다. 함수가 아니라면 never를 리턴한다. 위 Parameters와 비슷하지만 생성자 파라미터로 한정한다.

type T0 = ConstructorParameters<ErrorConstructor>;
//   T0 = [message?: string | undefined]
type T1 = ConstructorParameters<FunctionConstructor>;
//   T1 = string[]
type T2 = ConstructorParameters<RegExpConstructor>;
//   T2 = [pattern: string | RegExp, flags?: string | undefined]
type T3 = ConstructorParameters<any>;
//   T3 = unknown[]

//에러발생
type T4 = ConstructorParameters<Function>;
//   T4 = never


============
class Person {
    private _firstname: string
    private _lastname: string

    constructor(firstname: string, lastname: string) {
        this._firstname = firstname
        this._lastname = lastname
    }
}

type typeIs = ConstructorParameters<typeof Person>;
let personConstructionArgs: typeIs = ['first', 'last']

 

ReturnType<Type>

: 함수의 리턴타입을 가져온다.

declare function f1(): { a: number; b: string };

type T0 = ReturnType<() => string>;
//   T0 = string
type T1 = ReturnType<(s: string) => void>;
//   T1 = void
type T2 = ReturnType<<T>() => T>;
//   T2 = unknown
type T3 = ReturnType<<T extends U, U extends number[]>() => T>;
//   T3 = number[]
type T4 = ReturnType<typeof f1>;
//   T4 = { a: number; b: string }
type T5 = ReturnType<any>;
//   T5 = any
type T6 = ReturnType<never>;
//   T6 = never;

//에러
type T7 = ReturnType<string>;
//   T7 = any
type T8 = ReturnType<Function>;
//   T8 = any

 

InstanceType<Type>

: 생성자로 초기화된 인스턴스 타입을 리턴한다.

class C {
  x = 0;
  y = 0;
}

type T0 = InstanceType<typeof C>;
//   T0 = C
const test :T0 = {x: 1, y: 3, z: 2} // 에러

type T1 = InstanceType<any>;
//   T1 = any
type T2 = InstanceType<never>;
//   T2 = never

// 에러
type T3 = InstanceType<string>;
//   T3 = any
type T4 = InstanceType<Function>;
//   T4 = any

 

ThisParameterType<Type>

: 함수 타입의 this 파라미터의 type을 가져온다. this를 가지지 않는 함수 타입이면 unknown을 반환한다.

function toHex(this: Number) {
  return this.toString(16);
}

type T = ThisParameterType<typeof toHex>;
//   T = Number

function numberToString(n: ThisParameterType<typeof toHex>) {
  return toHex.apply(n);
}

 

OmitThisParameter<Type>

: 타입의 this 파라미터를 무시한다. 타입에 this가 없으면 그냥 Type이다. 있으면 this가 없는 Type 이 만들어진다.

function toHex(this: Number) {
  return this.toString(16);
}
console.log(toHex.call(2)); //2

type T = OmitThisParameter<typeof toHex>;
//   T = () => string
const fiveToHex: T = toHex.bind(16);

console.log(fiveToHex()); //10

 

728x90
반응형

'개발 > javascript' 카테고리의 다른 글

[js][IE] inline script [object]  (0) 2023.01.11
[js] 0.07 * 100?  (0) 2023.01.03
[js][IE] Invalid Date in IE  (0) 2022.04.26
[js] 자바스크립트 기초  (0) 2022.01.25
반응형

reactor를 공부하다 보면 Mono/Flux 이후에 많이 보게 되는 게 Sinks이다.. 솔직히 아직 mono와 flux도 완벽하게 이해하지 못했는데, 자꾸 새로운 개념이 튀어나오는 게 두렵긴 하지만 ㅋㅋ 어쨌건 계속 봐야 모래알 같은 지식이 쌓여 해변가가 될 것이라 믿기에, 짧은 지식으로 나마 원문을 파보도록 한다.

java doc에 기재되어 있는 Sinks의 개념이다. mono/flux보다 한 단계 더 내부에서 실제 동작하는 시그널의 구조체 같은 느낌인 듯 한데, 저것만 봐서 완전히 와닿지 않는다.

https://projectreactor.io/docs/core/release/reference/#processors

 

Reactor 3 Reference Guide

10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 1

projectreactor.io

기존에 processor라는 개념이 있는데, publisher이면서 subscriber의 역할을 하는 친구이다. 보통 reactive stream의 중간체(데이터를 구독받았다가 다시 방출하는..)를 가리키는데 reactive stream에서는 publisher로 보통 구현한다(이때 subscriber 인터페이스의 함수를 직접 호출하지 않도록 조심해야 한다).

 In Reactor a sink is a class that allows safe manual triggering of signals.

Sinks는 안전하게 event trigger를 할 수 있게 하는 interface며 processor의 대용으로 사용할 수 있다.

 

 

1. 일반 구독(flux.subscribe)

@Test
@DisplayName("해당 구독행위는 array에 대해서 하였기에 뒤에서 변화가 이루어진 부분에 대해서는 이미 끝난 행위라서 아무런 변화가 없음")
public void flux(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));
    Flux.fromIterable(array).collectList().subscribe( (data)->System.out.println(data)); //구독
    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //데이터 변화
}
[a, b, c, d, e]

구독 이후에 발생한 데이터의 변화는 감지하지 않는다. 데이터가 정해져있고 거기에 구독자를 추가한다.

 

2. subscribe를 계속 감지하게 하려면? Processor 사용

Flux의 Processor에는 FluxProcessor, EmitterProcessor, ReplayProcessor 등 많은 프로세서들이 존재하는데, 그중 EmitterProcessor는 여러 구독자(subscriber)가 사용할 수 있는 구독과 발행이 동시해 일어나는 프로세서이다.

EmitterProcessor: An implementation of a message-passing Processor implementing publish-subscribe with synchronous (thread-stealing and happen-before interactions) drain loops.

또한, 구독 행위가 등록되고 난 이후에 해당 이벤트가 발생하면 구독하는 대상에게 데이터를 동기적으로 전달한다.

 

EmitterProcessor를 사용한 코드

@Test
public void beforeSink(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e"}));

    //프로세서 시작 구간.
    EmitterProcessor<List<String>> data = EmitterProcessor.create();  //발행인
    data.subscribe(t -> System.out.println("1번 : "+t));  //구독자 추가
    FluxSink<List<String>> sink = data.sink();   //배달부
    sink.next(array); //배달 완료

    array.addAll(Arrays.asList(new String[]{"new", "data", "hello"}));  //내용 추가

    data.subscribe(t -> System.out.println("2번 : "+t));  //구독자 추가
    sink.next(array);  //배달

    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //내용 추가
    sink.next(array);  //배달
}
1번 : [a, b, c, d, e]
1번 : [a, b, c, d, e, new, data, hello]
2번 : [a, b, c, d, e, new, data, hello]
1번 : [a, b, c, d, e, new, data, hello, 1, 2, 3]
2번 : [a, b, c, d, e, new, data, hello, 1, 2, 3]

Processor는 기존에 있거나 새롭게 등장한 구독자(subscriber)에게 데이터(old data)를 전달한다. 구독자 중심이랄까. 

발행하는 기관(processor)을 건설하고 구독자를 모집(subscribe)한 뒤에 계속해서 발행(sink.next)하는 형태이다. 구독 이후에 데이터가 변경되어 발행되었다면 추가 구독없이 변경된 데이터를 받아볼 수 있다. 

작업 할 내용이 데이터가 중심이면 1번의 일반 구독형태를 사용하고, 구독자가 중심이면 프로세스를 사용하면 될 것 같다.

 

3. EmitterProcessor 부분이 reactor 3.5에서 deprecated 돼서 Sinks로 바꿔본다.

@Test
@DisplayName("flux: hot 테스트")
public void sinkTest_multicast(){
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e"})); //내용

    Sinks.Many<List<String>> sink = Sinks.many().multicast().directBestEffort(); //발행인
    sink.asFlux().subscribe(data -> System.out.println("1번 : " + data)); //구독자 추가
    sink.tryEmitNext(array); //발행함
    sink.asFlux().subscribe(data -> System.out.println("2번 : " + data)); //구독자 추가
    array.addAll(Arrays.asList(new String[]{"1", "2", "3", "4", "5"})); //내용 추가
    sink.tryEmitNext(array); //발행함
    sink.asFlux().subscribe(data -> System.out.println("3번 : " + data)); //구독자 추가
}
1번 : [a, b, c, d, e]
1번 : [a, b, c, d, e, 1, 2, 3, 4, 5]
2번 : [a, b, c, d, e, 1, 2, 3, 4, 5]

1번은 한번 구독으로 변경된 데이터도 다시 받았고, 2번은 구독 이후 데이터를 받았다. 3번은 구독(subscribe) 이후에 아무 데이터 변경이 없어서 로그에 남지 않는다.

 


 

아래 예제를 따라하면 Sink의 multicast/unicast의 차이, sinks.one/sinks.many를 공부할 수 있다. hot/cold publisher에 대한 개념은 덤!

https://prateek-ashtikar512.medium.com/projectreactor-sinks-bac6c88e5e69

 

ProjectReactor — Sinks

public static <T> Sinks.One<T> one() A Sinks.One that works like a conceptual promise: it can be completed with or without a value at any…

prateek-ashtikar512.medium.com

 

@Test
@DisplayName("n subscribers :: 1 message")
public void sinkOne(){
    Sinks.One<Object> sink = Sinks.one(); //n subscribers :: 1 message
    Mono<Object> mono = sink.asMono();

//        mono.subscribe(d -> System.out.println("Sam: " + d));
    mono.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE); //많이 받겠다고 해고 sinks.one 자체가 1개만 방출해서 하나만 받음
            System.out.println(">>>> onSubscribed " + s); // 이거 호출되고..
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });
    mono.subscribe(d -> System.out.println("Sam: " + d));

    sink.tryEmitValue("Hollo");
    sink.tryEmitValue("hi~"); //안받음! one이라서.. 하나만받음
}

@Test
@DisplayName("1 subscriber :: n message")
public void unicast(){
    Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

//        flux.subscribe(d-> System.out.println("Date : "+ d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
            System.out.println(">>>> onSubscribed " + s); // 이거 호출되고..
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });
    flux.subscribe(d-> System.out.println("Date : "+ d));

    sink.tryEmitNext("hi");
    sink.tryEmitNext("i am hungry");
    sink.tryEmitNext("bye");
}

@Test
@DisplayName("sink.many는 갑자기 많은걸 방출하면 FAIL_NON_SERIALIZED 에러가 나서 이때는 재시도를 해줘야 데이터 누수가 없음")
public void unicast2() throws InterruptedException {
    Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

    List<Object> list = new ArrayList<>();
    flux.subscribe(e -> list.add(e));

    for(int i = 0; i< 1000; i++){
        final int j = i;
        CompletableFuture.runAsync(() -> {
            sink.emitNext(j, new Sinks.EmitFailureHandler() {
                @Override
                public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
                    System.out.println(emitResult.toString());
                    //return true; // true if the operation should be retried, false otherwise.
                    // Sinks.many() factory methods that fail with EmitResult.FAIL_NON_SERIALIZED when multiple producers emit concurrently
                    //그래서 FAIL_NON_SERIALIZED 에러일 때는 재시도하도록 해줘야 함
                    return emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
                }
            });
        });
    }

    Thread.sleep(5_000);
    System.out.println(list.size());
}

@Test
@DisplayName("n subscribers :: n message / 구독 이후 발행한 메세지부터 받음 / hot")
public void multicast(){
    Sinks.Many<Object> sink = Sinks.many().multicast().onBackpressureBuffer();
    Flux<Object> flux = sink.asFlux();

    sink.tryEmitNext("hi");
    sink.tryEmitNext("hello");

    flux.subscribe(d -> System.out.println("SAM: " + d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(5); //총 가져올 갯수 1이면 sink.one과 같은 효과
            System.out.println(">>>> onSubscribed "); // 이거 호출되고 이제부터 데이터 받음
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o); //실 데이터
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });

    sink.tryEmitNext("????");
    flux.subscribe(d-> System.out.println("here: " +d));
    sink.tryEmitNext(" new");
}

@Test
@DisplayName("n subscribers :: n message / 구독 이전 발행 한 메세지도 받음 / cold")
public void manyReplay(){
    Sinks.Many<Object> sink = Sinks.many().replay().all(); //onSubscribe 하고 바로 첨부터하고 그담부터는 리스닝
    Flux<Object> flux = sink.asFlux();

    sink.tryEmitNext("hi");
    sink.tryEmitNext("how are you");

    flux.subscribe(d -> System.out.println("First: " + d));
    flux.subscribe(new Subscriber<Object>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE); //총 가져올 갯수 1이면 sink.one과 같은 효과
            System.out.println(">>>> onSubscribed "); // 이거 호출되고 이제부터 데이터 받음
        }

        @Override
        public void onNext(Object o) {
            System.out.println(">>>> onNext " + o); //실 데이터
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(">>>> onError " + t.getMessage()); //errrrr
        }

        @Override
        public void onComplete() {
            System.out.println(">>>> onComplete");
        }
    });

    sink.tryEmitNext("???");
    flux.subscribe(d -> System.out.println("Demon: " +d));
    sink.tryEmitNext("new");
}
728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[webclient] 비슷한데 뭘 써야할지 모르겠는 것들  (0) 2022.04.01
[webflux] block vs toFuture  (0) 2022.03.31
[webflux] 실무투입고민  (0) 2022.03.30
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 10. Flux  (0) 2022.03.25
반응형

websocket vs RESTful http

webscoket(pubnub.com)

  • 공통점
    1. tcp layer
    2. OSI 7계층 중 최상위 application layer
    3. TLS security(https, wss); 443 port
  • 차이점
    • websocket
      1. bi-directional: 양방향 통신
      2. full-duplex: 서버와 클라이언트가 동시에 독립적으로 상대에게 이야기할 수있음
      3. event based
      4. stateful protocol: 한쪽에서 끊을때까지 계속 연결된 상태
      5. persistent single TCP connection: websocket connection lifecycle동안 하나의 tcp connection에서 통신
      6. ws:// wss://
      7. 리얼타임/게임/채팅과 같이 실시간 업데이트, 지속적인 데이터를 받아야 하는 곳
    • RESTful http
      1. uni-directional: 한번에 한 방향으로 통신
      2. request-response based
      3. stateless protocol
      4. non persistent connection: 요청하고 끊고 응답주고 끊고, 10개의 요청을 보내면 10개의 connection이 생김
      5. http://, https://
      6. 과거 데이터, 데이터를 한번만 받아도 되는 곳

 

앞으로 아래 두 글을 바탕으로 샘플 websocket 프로젝트를 만들고 변형한다. websocket과 webflux는 사상이 비슷(event driven)한 친구라 이번 기회에 두 스택을 같이 사용해보려고 한다.

환경: java17, springboot2.5.6 / webflux / mongo on docker

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-websocket

 

Web on Reactive Stack

The original web framework included in the Spring Framework, Spring Web MVC, was purpose-built for the Servlet API and Servlet containers. The reactive-stack web framework, Spring WebFlux, was added later in version 5.0. It is fully non-blocking, supports

docs.spring.io

https://www.baeldung.com/spring-5-reactive-websockets

 

Reactive WebSockets with Spring 5 | Baeldung

A quick and practical example of Spring Framework 5 Reactive WebSockets API.

www.baeldung.com

 

728x90
반응형

+ Recent posts