반응형

이전 글: 2022.03.18 - [개발/reactive] - [reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet

 

[reactive] 5. 비동기 RestTemplate과 비동기 MVC/Servlet

오늘은 5강을 이어서 학습한다. 1. default, blocking request 아래 그림처럼 2초씩 지연이 있는 /service를 요청하는 /rest를 100건 rest template로 요청한다. /rest의 스래드가 한 개라서 2초씩 대기한다. @Sl..

bangpurin.tistory.com

 

 

이전 글의 콜백 지옥을 새로운 클래스 선언을 통해 리팩토링해본다.

//////////////before
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "f1" + idx);
f1.addCallback(s -> {
    ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
    f2.addCallback(s2 -> {
        ListenableFuture<String> f3 = myService.work(s2.getBody());
        f3.addCallback(s3 -> {
            dr.setResult(s3);
        }, e3 -> {
            dr.setErrorResult(e3.getMessage());
                }
        );
    }, e2-> {
        dr.setErrorResult(e2.getMessage());
    });
}, e-> {
    //throw 하면 어디서 에러가 난지 알 수 없어 비동기라
    dr.setErrorResult(e.getMessage());
});

///////////after
Completion
    //최초 실행
    .from(rt.getForEntity(URL1, String.class, "f1" + idx))
    //두번재 받아서 작업하고 return 해야함
    .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
    //어디서든 에러가 나면
    .andError(e -> dr.setErrorResult(e.toString()))
    //수행만하고 끝내는 아이 consumer
    .andAccept(s -> dr.setResult(s.getBody()));
public static class AcceptCompletion extends Completion{
    Consumer<ResponseEntity<String>> con;
    public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
        this.con = con;
    }

    @Override
    void run(ResponseEntity<String> val) {
        con.accept(val);
    }
}

public static class ErrorCompletion extends Completion{
    Consumer<Throwable> econ;
    //listenablefuture 실패 시 받는 throwable 처리
    public ErrorCompletion(Consumer<Throwable> econ) {
        this.econ = econ;
    }

    @Override
    void run(ResponseEntity<String> val) {
        //정상적일때 실행되면 안됨; 패스
        if(next != null){
            next.run(val);
        }
    }

    @Override
    void error(Throwable e) {
        econ.accept(e);
    }
}

public static class ApplyCompletion extends Completion{
    Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
    public ApplyCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
        this.fn = fn;
    }

    @Override
    void run(ResponseEntity<String> val) {
        ListenableFuture<ResponseEntity<String>> lf = fn.apply(val);
        lf.addCallback(s -> complete(s), e-> error(e));
    }
}

public static class Completion{
    Completion next;

    public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
        Completion c = new Completion();

        lf.addCallback(s -> {
            c.complete(s);
        }, e -> {
            c.error(e);
        });
        return c;
    }

    void error(Throwable e) {
        if(next != null){
            next.error(e);
        }
    }

    void complete(ResponseEntity<String> s) {
        if(next != null){
            next.run(s);
        }
    }

    void run(ResponseEntity<String> val) {
    }

    public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fun){
        Completion c = new ApplyCompletion(fun);
        this.next = c;
        return c;
    }

    public void andAccept(Consumer<ResponseEntity<String>> con){
        Completion c = new AcceptCompletion(con);
        this.next = c;
    }

    public Completion andError(Consumer<Throwable> econ){
        Completion c = new ErrorCompletion(econ);
        this.next = c;
        return c;
    }
}

위와 같이 만들자니 세 번째 콜을 아래와 같이 추가할 때 형 변환 에러가 난다.

  .andApply(s -> myService.work(s.getBody)

andApply는 request로 ResponseEntity<String>을 주고 response로 ListenableFuture<ResponseEntity<String>>를 받아야 하지만 work 함수는 그렇지 않다.

그러면 저 request/response를 만족하는 같은 내용의 함수를 또 작성해야 하는가?

뭐 그래도 작동은 하겠지만, 매번 함수를 작성하는 것은 깔끔하지 못하기에 generic을 통해 해결한다.

public static class AcceptCompletion<S> extends Completion<S, Void>{
    Consumer<S> con;
    public AcceptCompletion(Consumer<S> con) {
        this.con = con;
    }

    @Override
    void run(S val) {
        con.accept(val);
    }
}

public static class ErrorCompletion<T> extends Completion<T, T>{
    Consumer<Throwable> econ;
    //listenablefuture 실패 시 받는 throwable 처리
    public ErrorCompletion(Consumer<Throwable> econ) {
        this.econ = econ;
    }

    @Override
    void run(T val) {
        //정상적일때 실행되면 안됨; 패스
        if(next != null){
            next.run(val);
        }
    }

    @Override
    void error(Throwable e) {
        //계속 타고온 에러를 처리
        econ.accept(e);
    }
}

public static class ApplyCompletion<S, T> extends Completion<S, T>{
    Function<S, ListenableFuture<T>> fn;
    public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
        this.fn = fn;
    }

    @Override
    void run(S val) {
        ListenableFuture<T> lf = fn.apply(val);
        lf.addCallback(s -> complete(s), e-> error(e));
    }
}

public static class Completion<S, T>{
    Completion next;

    public static <S, T> Completion<S,T> from(ListenableFuture<T> lf) {
        Completion<S, T> c = new Completion<>();

        lf.addCallback(s -> {
            c.complete(s);
        }, e -> {
            c.error(e);
        });
        return c;
    }

    void error(Throwable e) {
        if(next != null){
            next.error(e);
        }
    }

    void complete(T s) {
        if(next != null){
            next.run(s);
        }
    }

    void run(S val) {
    }

    public <V> Completion<T, V> andApply(Function<T, ListenableFuture<V>> fun){
        Completion<T, V> c = new ApplyCompletion(fun);
        this.next = c;
        return c;
    }

    public void andAccept(Consumer<T> con){
        Completion<T, Void> c = new AcceptCompletion(con);
        this.next = c;
    }

    public Completion<T, T> andError(Consumer<Throwable> econ){
        Completion<T, T> c = new ErrorCompletion<>(econ);
        this.next = c;
        return c;
    }
}

 

728x90
반응형

+ Recent posts