반응형

이전 글: 2022.03.23 - [개발/reactive] - [reactive] 9. Mono

 

[reactive] 9. Mono

이전 글: 2022.03.22 - [개발/reactive] - [reactive] 8. webflux [reactive] 8. webflux 이전 글: 2022.03.21 - [분류 전체보기] - [reactive] 7. CompletableFuture [reactive] 7. CompletableFuture 이전 글:..

bangpurin.tistory.com

 

오늘은 flux의 간단한 사용법을 배운다.

 A Flux object represents a reactive sequence of 0.. N items,
while a Mono object represents a single-value-or-empty (0.. 1) result.

Mono와 Flux의 차이를 검색해보면, mono는 하나의 값을 return할return 할 때 쓰이는 반면 flux는 n개의 값을 return 할 때 쓰인다고 나온다. 그런데 사실 두 차이를, 어쩌면 그 의미를 잘 모르겠다. 아래의 예시에서는 같은 결과를 mono/flux로 각각 작성 가능하다는 것을 보여준다. 차이는 flux를 쓰면 flux라이브러리의 다양한 오퍼레이션 함수를 사용할 수 있다고 하는데, 데이터를 수정 없이 그대로 내린다면 mono나 flux나 그게 그거 아닌가..?

@GetMapping("/event/{id}")
Mono<List<Event>> hello(@PathVariable long id){
    //return Mono.just(new Event(id, "event "+ id));
    List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
    //데이터를 컬랙션으로 묶어서 다루고, 각 데이터 다루거나 편집할 때 힘듦
    return Mono.just(list);
}

@GetMapping(value = "/events")
Flux<Event> events(){
    List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
    //data stream -> .map 등 사용 가능
    return Flux.fromIterable(list);
    //같은 결과
    //return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}

@Data
@AllArgsConstructor
public static class Event{
    long id;
    String value;
}

위와 같이 작성하고 두 api를 요청했을 때, 같은 응답 결과가 내려온다.

toby1 % curl localhost:8080/events
[{"id":1,"value":"event1"},{"id":2,"value":"event2"}]                                                                                                                                             nhn@AL01590036 toby1 % curl localhost:8080/events

 

events의 데이터 타입을 바꾸면

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
    List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
    //data stream -> .map 등 사용 가능
    return Flux.fromIterable(list);
    //return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}
toby1 % curl localhost:8080/events
data:{"id":1,"value":"event1"}

data:{"id":2,"value":"event2"}

데이터가 나눠서 들어오는 것을 볼 수 있다.

자바 8의 stream의 기능을 써서 조금 더 수정해보면 아래와 같다.

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
    //데이터생성 using stream
    //Stream<Event> s = Stream.generate(() -> new Event(System.currentTimeMillis(), "val"));
    //data stream -> .map 등 사용 가능; 위와 동일한 결과
    //return Flux.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "val")).limit(10));
    return Flux //server sent event
            .fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "val")))
            .delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
            .take(10);//10개의 request를 보내고 다 오면 cancel 처리
}

stream을 드러내고 flux 기능만을 사용하면 아래와 같겠다.

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
   return Flux //server sent event
            //.range(1, 10) 앞에서 미리 정하고 들어갈 수도 있음;
            //.<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value")))//데이터를 계속 흘러서 보내는 역할; type hint 줘야함
            //상태값을 바꿔서 리턴; 초기상태, 상태 바꿔주는 함수 그담 상태 리턴
            .<Event, Long>generate(()->1L, (id, sink) -> {
                sink.next(new Event(id, "value" + id));//값을 받아 이벤트를 생성해서 싱크로 보내고
                return id+1;//다음 상태 리턴; id로 들어가겟지
            })
            .delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
            .take(10);//10개의 request를 보내고 다 오면 cancel; 뒤에서 끊는 개념
}

flux의 zip을 활용하면 아래 두 가지 방법이 가능하다.

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
   Flux<Event> es = Flux //server sent event
            //상태값을 바꿔서 리턴; 초기상태, 상태 바꿔주는 함수 그담 상태 리턴
            .<Event, Long>generate(()->1L, (id, sink) -> {
                sink.next(new Event(id, "value" + id));//값을 받아 이벤트를 생성해서 싱크로 보내고
                return id+1;//다음 상태 리턴; id로 들어가겟지
            })
            //.delayElements(Duration.ofSeconds(1))//background thread를 별도로 만들어서 처리
            //.take(10);//10개의 request를 보내고 다 오면 cancel; 뒤에서 끊는 개념
    ;
    Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
    //결합해서 delay의 효과를 볼 수 있음
    return Flux.zip(es, interval).map(tu -> tu.getT1());
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Event> events(){
    Flux<String> es = Flux.generate(sink -> sink.next("value"));
    Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); //0부터 시작
    //event 조합
    return Flux.zip(es, interval).map(tu -> new Event(tu.getT2(), tu.getT1())).take(10);
}

비슷한 내용 테스트

https://www.devkuma.com/docs/spring-webflux/

 

Spring WebFlux의 간단한 사용법

여기서는 Spring WebFlux의 간단한 사용법에 대해 소개하겠다. Spring WebFlux는 Spring 5에서 추가된 논블로킹(Non-Blocking) 런타임에서 리액티브 프로그래밍을 할 수 있는 새로운 Web 프레임워크이다. 위의

www.devkuma.com

 

728x90
반응형

'개발 > reactive' 카테고리의 다른 글

[webflux] 실무투입고민  (0) 2022.03.30
[spring] spring-web and spring-webflux  (0) 2022.03.25
[reactive] 9. Mono  (0) 2022.03.23
[reactive] 8. webflux  (0) 2022.03.22
[reactive] 7. CompletableFuture  (0) 2022.03.21

+ Recent posts