728x90
반응형
728x90
반응형
반응형

이전 글: 2022.03.16 - [개발/reactive] - [reactive] 4-1. java Future/FutureTask/Callable/Runnable

 

[reactive] 4-1. java Future/FutureTask/Callable/Runnable

callable vs runnable interface Runnable interface Callable interface java version package java 1.0 ~ java.lang java 1.5 ~ java.util.concurrent return 계산 결과를 받을 수 없음 계산 결과 받을 수 있음(..

bangpurin.tistory.com

이전 글과 오늘 글은 아래 유튜브를 보고 작성하였다.

 

1. using Spring Async

@Slf4j
@EnableAsync  //있어야 async 작동
@SpringBootApplication
public class Spring {

    @Component
    public static class MyService{
        //future인데 등록해서 받아볼 수 있게하는, 스프링이 만든 ListenableFuture
        //요청할 때 마다 스레드를 계속 만들어 비효율적인 simpleAsyncTaskExecutor -> 빈으로 풀 등록해서 쓰자
        //비동기가 여러개고 스레드 풀을 분리해서 지정하고 싶으면 Async("name") 지정 가능
        @Async
        public ListenableFuture<String> hello() throws InterruptedException {
            log.info("hello()");
            Thread.sleep(2_000);
            return new AsyncResult<>("hello");
        }
    }

    //task executor를 빈으로 해두면 알아서 가져가서 쓰게 되어있음
    @Bean
    ThreadPoolTaskExecutor threadPoolExecutor(){
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(10);//기본으로 만드는거; 첫번째 스레드 요청이 올 때; jmx를 통해 런타임에도 값 수정이 가능
        te.setQueueCapacity(200);//기본 풀이 다 차면 큐에 먼저 차
        te.setMaxPoolSize(100);//큐가 꽉 찼을 때 에러나는데 그걸 막아주려고 하는 설정
        te.setThreadNamePrefix("async-");
        te.initialize();
        return te;
    }

    public static void main(String[] args) {
        //try with resource로 실행하면 작업 후 알아서 종료
        try(ConfigurableApplicationContext c = SpringApplication.run(Spring.class, args)){

        }
    }

    @Autowired MyService myService;

    //모든 빈 등록 마치면 바로 실행 됨
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> f = myService.hello();
            f.addCallback(s -> System.out.println(s), e -> System.out.println(e.getMessage()));
            log.info("Exit");
        };
    }
}

여기서 주의할 점.

ThreadPoolTaskExecutor에 max pool size에 설정하는 값은 큐가 다 차고 나서 늘어가는 풀 사이즈이다.

https://www.baeldung.com/java-threadpooltaskexecutor-core-vs-max-poolsize

 

참고로 HttpServletRequest/Response -> 자바의 InputStream 을 구현한 것으로, blocking방식이다.


2. using Spring Web - default

우선 서버 설정 값

#서블릿 스레드
server.tomcat.threads.max=1
#작업스레드풀?
spring.task.execution.pool.core-size=100
@Slf4j
public class LoadTest {
    static AtomicInteger cnt = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(100);

        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/callable";

        StopWatch sw = new StopWatch();
        sw.start();

        for(int i = 0; i <100; i++){
            es.execute((() -> {
                int idx = cnt.addAndGet(1);
                log.info("thread {}", idx);
                StopWatch s1 = new StopWatch();
                s1.start();
                rt.getForObject(url, String.class);
                s1.stop();
                log.info("elapsed:{} -> {}", idx, s1.getTotalTimeSeconds());
            }));
        }
     
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);

        sw.stop();
        log.info("total time {}", sw.getTotalTimeSeconds());
    }
}
@RestController
public static class MyController{
    @GetMapping("/callable")
    public String callable() throws InterruptedException {
        log.info("async");
        Thread.sleep(2_000);
        return "hello";
    }
}

위 요청은 아래 그림과 같이 표현된다.

load test는 100개의 스래드를 만들어 거의 동시에 요청했으나, 순차 처리로 인해 총 소요 시간은 200초가 넘는다.

...
13:04:56.428 [pool-1-thread-50] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.427 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.428 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
13:04:56.427 [pool-1-thread-90] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/callable
...
13:08:04.993 [pool-1-thread-61] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:04.993 [pool-1-thread-61] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:04.994 [pool-1-thread-61] INFO toby.live.lecture4.LoadTest - elapsed:61 -> 188.612548654
13:08:06.999 [pool-1-thread-82] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:06.999 [pool-1-thread-82] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:06.999 [pool-1-thread-82] INFO toby.live.lecture4.LoadTest - elapsed:82 -> 190.618529907
13:08:09.002 [pool-1-thread-71] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:09.002 [pool-1-thread-71] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:09.003 [pool-1-thread-71] INFO toby.live.lecture4.LoadTest - elapsed:71 -> 192.622624014
13:08:11.009 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:11.009 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:11.009 [pool-1-thread-32] INFO toby.live.lecture4.LoadTest - elapsed:32 -> 194.629844982
13:08:13.012 [pool-1-thread-35] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:13.012 [pool-1-thread-35] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:13.013 [pool-1-thread-35] INFO toby.live.lecture4.LoadTest - elapsed:35 -> 196.631976988
13:08:15.015 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:15.015 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:15.016 [pool-1-thread-59] INFO toby.live.lecture4.LoadTest - elapsed:59 -> 198.634385976
13:08:17.018 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
13:08:17.018 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
13:08:17.019 [pool-1-thread-65] INFO toby.live.lecture4.LoadTest - elapsed:65 -> 200.638424197

/callable 쪽은 요청을 거의 동시에 100개의 요청을 받았지만 일반 동기 방식이므로 하나씩 처리한다.

...
2022-03-17 13:08:06.998  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:09.002  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:11.008  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:13.012  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:08:15.014  INFO 22518 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : async

 

3. callable

/callable 쪽만 소스를 수정한다.

@RestController
public static class MyController{
    @GetMapping("/callable")
    public Callable<String> callable() throws InterruptedException {
        log.info("callable");
        //Callable Interface 익명 클래스
        return () -> {
            log.info("async");
            Thread.sleep(2_000);
            return "hello";
        };
    }

callable은 요청을 받으면 별도의 스레드를 생성해 일을 병렬로 처리한다.

100개의 스래드가 거의 동시에 요청을 보내 100개의 작업 스래드가 받아서 동시에 처리하기 때문에 총 소요시간도 2초 남짓이다.

...
13:33:24.694 [pool-1-thread-63] INFO toby.live.lecture4.LoadTest - elapsed:63 -> 2.289299445
13:33:24.694 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 2.291355199
13:33:24.695 [pool-1-thread-67] INFO toby.live.lecture4.LoadTest - elapsed:67 -> 2.292713823
13:33:24.695 [pool-1-thread-81] INFO toby.live.lecture4.LoadTest - elapsed:80 -> 2.289499954
13:33:24.695 [pool-1-thread-3] INFO toby.live.lecture4.LoadTest - elapsed:3 -> 2.290040178
13:33:24.695 [pool-1-thread-57] INFO toby.live.lecture4.LoadTest - elapsed:57 -> 2.292708376
13:33:24.696 [pool-1-thread-44] INFO toby.live.lecture4.LoadTest - elapsed:44 -> 2.29307173
13:33:24.696 [main] INFO toby.live.lecture4.LoadTest - total time 2.307467369

/callable이 1개의 서블릿 스레드로 받았지만 별도의 작업 스래드로 바로 토스하여 일을 하는 모습..

...
2022-03-17 13:33:22.624  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.624  INFO 23081 --- [        task-98] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:33:22.625  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.625  INFO 23081 --- [        task-99] toby.live.lecture4.SpringWeb             : async
2022-03-17 13:33:22.625  INFO 23081 --- [nio-8080-exec-1] toby.live.lecture4.SpringWeb             : callable
2022-03-17 13:33:22.625  INFO 23081 --- [       task-100] toby.live.lecture4.SpringWeb             : async

 


4. DeferredResult

DeferredResult란 “지연된 결과”를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 비동기 작업이다. 

기존에 /callable api가 있던 곳에 아래와 같이 신규 api를 추가해준다.

Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();

@GetMapping("/dr")
public DeferredResult<String> dr(){
    log.info("dr");
    //DR은 setResult가 오기전 까지 응답을 끊지 않고 대기, 그래서 테스트 시 타임아웃 시간을 길게
    //결과가 오면 바로 return
    DeferredResult<String> dr = new DeferredResult<>(600_000L);
    results.add(dr);
    return dr;
}

@GetMapping("/dr/count")
public String drcount(){
    return String.valueOf(results.size());
}

@GetMapping("/dr/event")
public String drevent(String msg){
    for(DeferredResult<String> dr: results){
        dr.setResult("hello " + msg);
        results.remove(dr);
    }
    return "OK";
}

load test에 /dr을 쏘게끔 설정하고 실행하면 100개의 요청을 우르르 보내지만 아무 응답을 받지 않고 대기하는 모습을 보이는데

이때 /dr/count를 쏴보면 100개의 요청이 큐에 들어있는 것을 확인할 수 있다.

그 상태에서 /dr/event?msg=today 를 요청하면 그제야 100개의 응답이 한 번에 온다. setResult라는 이벤트를 주면 리턴하는 방식이다.

...
13:48:17.039 [pool-1-thread-98] INFO toby.live.lecture4.LoadTest - elapsed:98 -> 213.022709247
13:48:17.039 [pool-1-thread-30] INFO toby.live.lecture4.LoadTest - elapsed:30 -> 213.027111781
13:48:17.039 [pool-1-thread-16] INFO toby.live.lecture4.LoadTest - elapsed:16 -> 213.026715767
13:48:17.040 [pool-1-thread-1] INFO toby.live.lecture4.LoadTest - elapsed:1 -> 213.025866684

 

5. ResponseBodyEmitter

ResponseBodyEmitter는 하나의 요청에 대한 응답을 여러번에 나눠서 보낼 때 사용된다.

//여러번에 나눠서 데이터를 보낸다
@GetMapping("/emitter")
public ResponseBodyEmitter emitter(){
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    Executors.newSingleThreadExecutor().submit(() -> {
        try {
            for(int i =0; i<=50; i++){
                emitter.send("<p> stream " +i+ "</p>");
                Thread.sleep(100);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    return emitter;
}

/emitter를 브라우저로 실행하면 stream 문구가 100ms 마다 계속 뜨는 것을 볼 수 있다. 위에 상태 표시에도 로딩으로 되어 있어 계속 응답을 받아오고 있음을 알 수 있다.


참고

https://jongmin92.github.io/2019/03/31/Java/java-async-1/

 

자바와 스프링의 비동기 기술

해당 포스팅은 토비님의 토비의 봄 TV 8회 스프링 리액티브 프로그래밍 (4) 자바와 스프링의 비동기 기술 라이브 코딩을 보며 따라했던 실습 내용을 바탕으로 정리한 글입니다. 실습 코드들은 Inte

jongmin92.github.io

 

728x90
반응형
반응형

이전 글: 2022.01.28 - [개발/spring] - [actuator] git info를 health에 포함하기

 

[actuator] git info를 health에 포함하기

환경: java11, springboot2.6.2, gradle7.1 spring actuator? 스프링 부트 기반의 애플리케이션에서 제공하는 여러가지 정보를 쉽게 모니터링하게 하는 라이브러리 DB연결상태, disk space, rabbit, reddis, 커스..

bangpurin.tistory.com

 

이전에 프로젝트에 actuator를 적용하여 /health url을 통해 배포된 소스의 해쉬, 브랜치, 잘 떠있는지 유무 등을 알 수 있었다. 

스프링 actuator는 metrics를 통해 애플리케이션의 현 자원 상태(스레드 풀 상태, db 커넥션 상태, jvm gc상태 등)를 알 수 있게 하는데, 이를 주기적으로 요청하여 그 흐름을 관찰할 수도 있다. 특히 스프링에서 지원하는 prometheus를 추가적으로 설정하면 prometheus서버가 원하는 포맷으로 변환해주기 때문에 쉽게 prometheus서버와 연동이 가능하다.

 

오늘은 서비스 모니터링에 주로 쓰이는 prometheus를 추가적으로 설정해본다.

이전 글에서 설정한 그대로 작업 시작한다.

 

1. gradle build에 추가

//기존에 있었음
implementation 'org.springframework.boot:spring-boot-starter-actuator'
//actuator prometheus 추가
implementation("io.micrometer:micrometer-registry-prometheus")

 

2. application.properties 추가

//prometheus 추가
management.endpoints.web.exposure.include=health, shutdown, info, prometheus
//이미 있었음
management.endpoint.health.enabled=true
//추가
management.endpoint.prometheus.enabled=true

 

이렇게 하고 서버를 재시작하면 /actuator/prometheus 가 활성화된다.

/actuator/prometheus

 

이외 추가적인 정보에 대한 설정은 prometheus.yml을 생성하여 작성하면 된다.


참고

https://pyxispub.uzuki.live/?p=1810 

 

Spring Boot + Actuator + Micrometer로 Prometheus 연동하기

이제까지 블로그에서 Prometheus, Grafana 에 대해 여러 번 다룬 적이 있었다. Monitoring with cAdvisor + Prometheus + Grafana (https://pyxispub.uzuki.live/?p=1764)Alert with Grafana(https://pyxispub.uzuki.live/?p=1779) 두번째 글 까

pyxispub.uzuki.live

https://happycloud-lee.tistory.com/217

 

[SC11] Spring Boot Actuator 이란 ?

1. Spring Boot Actuator 이해 1) WHY? 각 마이크로서비스는 고유의 목적을 가지고 개발되고 운영됩니다. 하지만 모든 마이크로서비스에는 공통으로 요구되는 기능이 있습니다. 예를 들어 Health Check와 상

happycloud-lee.tistory.com

 

728x90
반응형
반응형

callable vs runnable interface

  Runnable interface Callable interface
java version
package
java 1.0 ~
java.lang
java 1.5 ~
java.util.concurrent
return 계산 결과를 받을 수 없음 계산 결과 받을 수 있음(a generic value V)
throw checked exception을 throw 불가 checked exception을 throw 가능
override method run() call()

https://www.geeksforgeeks.org/difference-between-callable-and-runnable-in-java/

 

Difference Between Callable and Runnable in Java - GeeksforGeeks

A Computer Science portal for geeks. It contains well written, well thought and well explained computer science and programming articles, quizzes and practice/competitive programming/company interview Questions.

www.geeksforgeeks.org

 

자바 비동기 맛보기

0. ExecutorService 특징

  • es.execute vs es.submit 
  • 시간이 오래 걸리는 작업을 할 때는 submit 추천. future object를 리턴하기 때문에 나중에 get으로 꺼낼 수 있음(get은 블로킹!)
  • 그냥 스레드의 병렬 처리만 필요하면 execute
  execute() submit()
accept Runnable Runnable and Callable
declared in Executor interface ExecutorService interface
return void a Future object
  • es.shutdown 은 graceful shutdown이라서 할 거 다 할 때까지 기다림
  • es.shutdownNow 가 강제 종료
  • es.awaitTermination 은 타임아웃을 기다리는 용도로 실제 shutdown 시키지는 않음
  • 따라서 shutdown 먼저 걸로 awaitTermination을 써야 함

 

1. Future class

public static void main(String[] args) throws InterruptedException, ExecutionException {
    //스래드를 새로 만들고 반납하는 것이 비용이 많이 들어가니, 풀이 나옴
    //맥시멈 제한없고 처음에 스레드 없음 요청할 때 새로 만들고 다음 요청 시 기존 만든 스레드 실행
    ExecutorService es = Executors.newCachedThreadPool();
    //es.execute -> void

    //Future 비동기 핸들러지 결과물은 아님
    Future<String> f = es.submit(() -> {
        Thread.sleep(5_000);
        log.info("helloo");
        return "hello";
    });
    //f.isDone() -> non blocking method;
    System.out.println(f.isDone());
    log.info("before");
    //f.get -> blocking method
    System.out.println(f.get()); //비동기가 완성될 때 까지 블로킹(20초 기다림)
    log.info("exit");//맨 마지막
    System.out.println(f.isDone());
}

 

2. FutureTask class

Future 자체를 object로 만들어줌

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    FutureTask<String> f = new FutureTask<String>(() -> {
       Thread.sleep(3_000);
       log.info("async");
       return "hello";
    }){//익명클래스
        @Override
        protected void done() { //다 하면 이거해; f.get 할 필요없음
            try {
                System.out.println(get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    };

    es.execute(f);
    System.out.println(f.isDone());
    //System.out.println(f.get());
    es.shutdown(); //다 끝나면 종료
}

 

3. FutureTask 확장

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    CallbackFutureTask f = new CallbackFutureTask(() -> {
        Thread.sleep(3_000);
        if(1 == 1){
            throw new RuntimeException(">async error");
        }
        log.info("async");
        return "hello";
    }, s -> System.out.println("success:" + s)
    , e -> System.out.println(">error: " + e.getMessage())
    );

    es.execute(f);
    es.shutdown(); //다 끝나면 종료
}

interface SuccessCallback{
    void onSuccess(String result);
}

interface ExceptionCallback{
    void onError(Throwable t);
}

public static class CallbackFutureTask extends FutureTask<String>{
    SuccessCallback sc;
    ExceptionCallback ec;

    public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
        super(callable);
        //if(sc == null) throw null; //null point exception
        this.sc = Objects.requireNonNull(sc); //if null -> NPE
        this.ec = Objects.requireNonNull(ec);
    }

    @Override
    protected void done() {
        try {
            sc.onSuccess(get());
        } catch (InterruptedException e) { //종료 시그널
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) { // 찐에러
            ec.onError(e.getCause());
        }
    }
}
728x90
반응형
반응형

한 트랜젝션에서 서로 다른 DB에 update/insert 작업을 연속적으로 하게 될 때 이를 하나의 트랜젝션으로 보존할 수 있도록 스프링에서는 ChainedTransactionManager라는 클래스를 제공해주었다.

물론 이는 1, 2 ->  2, 1 구조로 완벽한 commit/rollback 로직은 아니었지만, 대부분/정상적인/예측 가능한 로직 상 사용 가능한 대안이었다.

 참고) 1, 2 ->  2, 1 구조란? 

위와 같은 구조로 2 커밋 후 1 롤백 시, 2가 롤백되지 않는 단점이 있다.

참고) 완벽한 로직이 아닌 이유: https://taes-k.github.io/2020/06/09/chained-transaction-manager/

 

ChainedTransactionManager 데이터소스 트랜잭션 연결하기

다중 데이터소스 트랜잭션 연결 요새는 MSA로 시스템을 구축하여 서버와 DB도 모두 각각 분리하여 마이크로하게 시스템 설계를 하는 추세이지만, 서비스에 따라 여러가지 이유로인해 여러개의 Da

taes-k.github.io

 

허나 springboot 2.6.2에서 사용하려고 봤더니 아래와 같이 deprecated 되었다고 나오는 것이 아닌가..

//deprecated since 2.5
@Deprecated
public class ChainedTransactionManager implements PlatformTransactionManager

 

그래서 대안이 뭔고 봤더니, TransactionSynchronization를 사용하라고 한다.

Instead of using ChainedTransactionManager for attaching callbacks to transaction commit (pre commit/post commit), either register a TransactionSynchronization to explicitly follow transaction cleanup with simplified semantics in case of exceptions.
See Also: TransactionSynchronization.beforeCommit(boolean), TransactionSynchronization.afterCommit()

 

저게 어떤 역할을 하는지, 어떻게 사용하는지는.. 좀 더 공부가 필요하다.

 


참고)

chainedTransactionConfig? https://techfinanceworld.com/?p=494 

 

Chained Transaction Manager in Spring

Suppose we have multiple databases to which we need to query under a spring transaction. So just a @transactional annotation against a single database transaction manager won't work. So for multi resource, we can use XA i.e; instead of multiple transaction

techfinanceworld.com

스프링 진영에서 없애려는 이유 https://github.com/spring-projects/spring-data-commons/issues/2232

 

Deprecate ChainedTransactionManager [DATACMNS-1817] · Issue #2232 · spring-projects/spring-data-commons

Mark Paluch opened DATACMNS-1817 and commented ChainedTransactionManager is the primary class in org.springframework.data.transaction that is used for multi-transactionmanager arrangements. It is u...

github.com

 

728x90
반응형
반응형

환경: springboot2.6.2 / java 11

로그 레벨을 DEBUG로 프로젝트를 실행하니 아래와 같은 에러가 빵빵 지나가는 것이 아닌가..

grpc를 사용하는 프로젝트라, grpc server가 내려가 있어서 발행하는 것이 아닌가 싶어서 기동 후 다시 봤는데도 마찬가지였다.

java.lang.UnsupportedOperationException: Reflective setAccessible(true) disabled
	at io.grpc.netty.shaded.io.netty.util.internal.ReflectionUtil.trySetAccessible(ReflectionUtil.java:31)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$4.run(PlatformDependent0.java:239)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0.<clinit>(PlatformDependent0.java:233)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.isAndroid(PlatformDependent.java:294)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.<clinit>(PlatformDependent.java:93)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:223)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:210)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.cached(AsciiString.java:1401)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<clinit>(AsciiString.java:48)
	at io.grpc.netty.shaded.io.grpc.netty.Utils.<clinit>(Utils.java:78)
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:82)
    
    ...
    
    
    
java.lang.IllegalAccessException: class io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$6 cannot access class jdk.internal.misc.Unsafe (in module java.base) because module java.base does not export jdk.internal.misc to unnamed module @61874b9d
	at java.base/jdk.internal.reflect.Reflection.newIllegalAccessException(Reflection.java:361)
	at java.base/java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:591)
	at java.base/java.lang.reflect.Method.invoke(Method.java:558)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$6.run(PlatformDependent0.java:353)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0.<clinit>(PlatformDependent0.java:344)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.isAndroid(PlatformDependent.java:294)
	at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.<clinit>(PlatformDependent.java:93)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:223)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<init>(AsciiString.java:210)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.cached(AsciiString.java:1401)
	at io.grpc.netty.shaded.io.netty.util.AsciiString.<clinit>(AsciiString.java:48)
	at io.grpc.netty.shaded.io.grpc.netty.Utils.<clinit>(Utils.java:78)
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:82)

 

원인: 자바 11부터 Object의 접근권한이 더이상 증가하지 않는다는 에러. 에러가 아예 안 나려면(정상 액션을 원한다면) 자바 8을 사용하라는데, 발전하는 IT 시대에 역행은 오버라고 생각한다.. 자바 11에 AccessibleObject라는 새로운 클래스가 생겼으니, 아마 관련 라이브러리들이 곧 수정하지 않을까 하는 생각..

https://stackoverflow.com/questions/60241857/java-lang-unsupportedoperationexception-reflective-setaccessibletrue-disabled

 

java.lang.UnsupportedOperationException: Reflective setAccessible(true) disabled

When I run my Ktor application with gradle run then I've got the following exception: 19:21:11.795 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default l...

stackoverflow.com

 

해결:

관련 에러에 대해 검색해보면 이것저것 많이 나오는데, 특히 뜰 때 옵션에 아래 등과 같은 옵션을 주라고 한다.

-Dio.netty.tryReflectionSetAccessible=true

설정해보았지만 여전히 에러같이 생긴 친구들이 지나가서 굉장히 신경 쓰인다..

사실 저 로그는 에러라기보다는 디버그에 가깝다(고 한다..). 그래서 그냥 해당 로그 레벨을 ERROR로 바꿔버렸다..

<logger name="io.grpc.netty" level="ERROR"/>

 

사실 에러(?)를 무시하는 격이라 좀 찝찝하긴 하지만, 아직까지는 더 나은 방안은 없어 보인다. 기다려보자.

728x90
반응형
반응형

 

api gateway란

  • api management tool sits between client and backend
  • reverse proxy
An API gateway is set up in front of the microservices and becomes the entry point for every new request being executed by the app. It simplifies both the client implementations and the microservices app.

 

기능들

  1. TLS termination
    • ssl /tls 1.3 지원
  2. client authentication
    1. auth_jwt 로 키고/켜고 가능
    2. auth_jwt_key_file 로 저장된 키파일(.jwk)불러올 수 있음
    3.  키가 유효하면 프록시 됨
  3. fine-grained access control
    1. jwt_claim_uid -> 토큰 안 uri 값 가져올 수 있음, 가져와서 더 상세한 조건으로 제한 가능
  4. request routing
    • uri별로 어디로 프록시할지 정할 수 있음
    • proxy_pass 와 =,~ 등을 이용한 정규식 사용 가능
  5. rate limiting
    • limit_req_zone
    • 속도 제한 가능(ex. 5r/m = 5requests per a minute); 사실 횟수 제한에 가까움
    • 이를 초과하면 limit_req_status로 설정해둔 status code가 response로 내려감
  6. load balancing

 

 

728x90
반응형

'서버 세팅 & tool > nginx' 카테고리의 다른 글

[이슈해결][apache] 304 NOT_MODIFIED  (0) 2023.10.12
[nginx] WAF  (0) 2022.03.30
[nginx] 실전 cors 해결하기  (0) 2022.03.14
[nginx] load balancing  (0) 2022.03.11
[nginx] reverse proxy  (0) 2022.03.03
반응형

문제: nginx -> springboot2.6.2 swagger로 api요청 시 CORS 문제로 실행 안됨

 

1. spring 설정으로 addCorsMapping 해주어야 함

@Component
public class WebMvcConfig implements WebMvcConfigurer {
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/api/**");
    }
}

 

2. nginx.conf 로 프락시 되는 주소를 localhost -> 해당 도매인으로 수정

#user  nobody;
worker_processes  1;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" ';

    server {
        listen       80;
        server_name  alpha.example.com;
        access_log  /home1/test/logs/access_log.log  main;

        location / {
           #proxy_pass http://localhost:8600;
            proxy_pass http://alpha.example.com:8600;
        }
    }
}
728x90
반응형

'서버 세팅 & tool > nginx' 카테고리의 다른 글

[이슈해결][apache] 304 NOT_MODIFIED  (0) 2023.10.12
[nginx] WAF  (0) 2022.03.30
[nginx] API gateway  (0) 2022.03.14
[nginx] load balancing  (0) 2022.03.11
[nginx] reverse proxy  (0) 2022.03.03
반응형

 

로드밸런싱(load balancing)

  • 클라이언트와 백엔드 서비스 사이에 위치
  • 클라이언트로부터 요청을 받고 해당 요청을 수행할 수 있는 서비스나 인스턴스로 요청을 균등하게 전달하는 방법

 

서버의 로드밸런싱을 해주는 로드밸런서는 여러 장비/프로그램이 있을 수 있겠지만, 여기서는 nginx의 로드밸런싱에 대해 다루겠다.

https://docs.nginx.com/nginx/admin-guide/load-balancer/http-load-balancer/

 

HTTP Load Balancing | NGINX Plus

HTTP Load Balancing Load balance HTTP traffic across web or application server groups, with several algorithms and advanced features like slow-start and session persistence. Overview Load balancing across multiple application instances is a commonly used t

docs.nginx.com

  • ip/port/domain 기반한 group/pool/service단위로 로드발랜싱 가능
  • proxy_pass 에 그룹으로 지정
  • http context 안에 든 upstream 에서 설정

 

로드밸런싱 룰

  • 사용자가 정한 방식에 의해 적절한 서버를 선택, 기본값은 라운드 로빈 알고리즘(균등)
  • weight를 따로줘서 가중치를 줄 수 있음. 서버가 받는 요청은 가중치에 비례함.
  • max_conns 최대 연결 서버 조정 가능, 나머지는 큐에
    • 관련하여 queue / timeout 설정 가능
  • 특정횟수만큼 실패하면 빠질 수 있음  max_fails/fail_timeouts

 

로드 밸런싱 알고리즘

주의) 로드밸런싱 알고리즘 중 세션을 보존할 수 있는게 있는데 새 서버를 추가하거나 제거하면 해시 키가 손실될 가능성이 높아서 세션 정보가 무효가 될 가능성이 있음

  1. round robin
  2. hash
    • hash $request_uri 
    • -> /example 이 요청 uri 일 경우 항상 동일한 서버로 
    • 가중치도 같이 줄 수 있음
  3. ip_hash
    • 고객의 ipv4/v6를 기반으로 해시 키 잡음. 동일한 서버로 라우팅함
  4. least_conn
    • 제일 놀고 있는 서버에게 감
  5. least_time
    • 가장 빠른 응답을 하는 서버에게, 헤더의 평균 응답시간 활
  6. 랜덤

 

세션 유지

(curl로하면 쿠키 저장이 안되서 바로 보기가 불가하고 브라우저 테스트 시 확인가능)

  • sticky cookie
    • 최초 요청 시 쿠키 생성하여 헤더에 세팅, 이후 요청에서는 헤더의 쿠키 값을 보고 해당 서버로 보냄
    • 쿠키 만료 시간, 도메인, path 설정 가능
  • sticky route 고정 경로
    • nginx가 서버의 route 변수(쿠키나 uri에서 가져옴)를 확인하여 프록시 할 서버를 식별
  • sticky learn
    • reqeust/response를 검사해서 어디로 프록시할지 결정

 

example .conf

728x90
반응형

'서버 세팅 & tool > nginx' 카테고리의 다른 글

[이슈해결][apache] 304 NOT_MODIFIED  (0) 2023.10.12
[nginx] WAF  (0) 2022.03.30
[nginx] API gateway  (0) 2022.03.14
[nginx] 실전 cors 해결하기  (0) 2022.03.14
[nginx] reverse proxy  (0) 2022.03.03
반응형

세 번째 강의 시작한다.

 

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    pub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
0    [main] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [main] DEBUG toby.live.SchedulerEx  - onNext:1
2    [main] DEBUG toby.live.SchedulerEx  - onNext:2
2    [main] DEBUG toby.live.SchedulerEx  - onNext:3
2    [main] DEBUG toby.live.SchedulerEx  - onNext:4
2    [main] DEBUG toby.live.SchedulerEx  - onNext:5
2    [main] DEBUG toby.live.SchedulerEx  - onComplete
EXXXXIT

위 코드를 돌리면 아래와 같이 로그가 찍힌다. 보이는 것처럼 main 스레드가 모든 일을 다 하고 있고, 만약 중간에 블로킹이 걸리면 main스레드는 진행을 하지 못하게 된다. pub - sub 사이의 일은 다른 스레드가 하게끔 하고 main스레드는 지나치게 하려면 어떻게 할까

 

subscribeOn

  • 주는자(publisher)가 느리거나 예측 불가할 경우 & 처리(consumer)가 빠를 때
  • subscriber를 별도의 스레드로 분리
public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher<Integer> subOnPub = subscriber -> {
        ExecutorService es = Executors.newSingleThreadExecutor(); //1개 스레드, 몰리면 큐에 넣고 순차처리
        es.execute(()->{
            pub.subscribe(subscriber);
        });
    };

    subOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
EXXXXIT
0    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:1
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:2
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:3
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:4
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:5
2    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onComplete

아까랑 다르게 main 스레드에서 exit을 먼저 하고 나머지 스레드로 작업한다.

 

publishOn

  • 데이터 생성(publisher)은 빠른데 처리하는 곳(consumer)이 느릴 때 
  • publisher를 별도의 스레드로 분리
  • 데이터 생성은 메인에서 빨리해 근데 가져가는 건 느리니까 별도에서 진행해
public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                log.debug("subscription request");
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher pubOnPub = subscriber -> {
        pub.subscribe(new Flow.Subscriber<Integer>() {  //중개 operator
            ExecutorService es = Executors.newSingleThreadExecutor(); //코어 갯수가 하나라서 큐에 넣어서 안꼬임

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }
            // 별개의 스래드로 아래 처리하고 싶음
            @Override
            public void onNext(Integer item) {
                es.execute(() -> subscriber.onNext(item));
            }

            @Override
            public void onError(Throwable throwable) {
                es.execute(() -> subscriber.onError(throwable));
            }

            @Override
            public void onComplete() {
                es.execute(() -> subscriber.onComplete());
            }
        });
    };

    pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
0    [main] DEBUG toby.live.SchedulerEx  - onSubscribe
1    [main] DEBUG toby.live.SchedulerEx  - subscription request
EXXXXIT
4    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:1
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:2
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:3
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:4
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onNext:5
5    [pool-1-thread-1] DEBUG toby.live.SchedulerEx  - onComplete

이 친구는 onSubscribe, exit까지 main 스레드에서 해버리고 나머지를 할당한 별도의 스레드에서 진행한다.

 

publishOn / subscribeOn 개념을 둘 다 사용할 수도 있다.

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = subscriber -> {
        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                log.debug("subscription request");
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    };

    Flow.Publisher<Integer> subOnPub = subscriber -> {
        ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){ //1개 스레드, 몰리면 큐에 넣고 순차처리
            @Override
            public String getThreadNamePrefix() {
                return "subOn-";
            }
        }); 
        es.execute(()->{
            pub.subscribe(subscriber);
        });
    };

    Flow.Publisher pubOnPub = subscriber -> {
        subOnPub.subscribe(new Flow.Subscriber<Integer>() {  //중개 operator
            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                @Override
                public String getThreadNamePrefix() {
                    return "pubOn-";
                }
            });

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }
            // 별개의 스래드로 아래 처리하고 싶음
            @Override
            public void onNext(Integer item) {
                es.execute(() -> subscriber.onNext(item));
            }

            @Override
            public void onError(Throwable throwable) {
                es.execute(() -> subscriber.onError(throwable));
            }

            @Override
            public void onComplete() {
                es.execute(() -> subscriber.onComplete());
            }
        });
    };

    pubOnPub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
    System.out.println("EXXXXIT");
}
EXXXXIT
0    [subOn-1] DEBUG toby.live.SchedulerEx  - onSubscribe
2    [subOn-1] DEBUG toby.live.SchedulerEx  - subscription request
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:1
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:2
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:3
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:4
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onNext:5
4    [pubOn-1] DEBUG toby.live.SchedulerEx  - onComplete

각 스레드에 이름을 주어 로그를 확인해봤더니, 메인은 메인대로 진행하였고(exxit) onSubscribe, request 까지는 subOn 스레드에서 진행하였고 나머지는 pubOn 스레드에서 진행함을 알 수 있다.

 

원리는 이렇고, flux로 구현하면 아래와 같다. 상황에 따라 publishOn/subscribeOn을 하나만 쓸 수도, 모두 쓸 수도 있겠다.

public static void main(String[] args) {
    Flux.range(1, 10) //1부터 10개
        .publishOn(Schedulers.newSingle("pub"))
        .log()
        .subscribeOn(Schedulers.newSingle("sub"))
        .subscribe(System.out::println); //onNext
    System.out.println("exit");
}
exit
32   [sub-1] INFO  reactor.Flux.PublishOn.1  - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
33   [sub-1] INFO  reactor.Flux.PublishOn.1  - | request(unbounded)
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(1)
1
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(2)
2
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(3)
3
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(4)
4
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(5)
5
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(6)
6
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(7)
7
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(8)
8
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(9)
9
34   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onNext(10)
10
35   [pub-2] INFO  reactor.Flux.PublishOn.1  - | onComplete()

 

flux.interval

메인 스레드 종료 후 별도 스레드에서 200ms마다 하나씩 숫자를 받아서 실행. 5초 지나면 종료되는 코드 작성.

public static void main(String[] args) throws InterruptedException {

    //user thread 하나라도 있으면 강종안함
    //daemon thread 만 남으면 강종
    Flux.interval(Duration.ofMillis(200))//주기를 가지고 숫자를 증가; 무한으로
        .take(10) //10개 받으면 끝; 중지시킨다
        .subscribe(s -> log.debug("onNext::{}", s));

    log.debug("Exit");
    TimeUnit.SECONDS.sleep(5); //그래서 유저 스래드를 만들어줘야 실행됨
}
15   [main] DEBUG toby.live.FluxScEx  - Exit
223  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::0
419  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::1
619  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::2
818  [parallel-1] DEBUG toby.live.FluxScEx  - onNext::3
1015 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::4
1219 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::5
1419 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::6
1618 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::7
1817 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::8
2019 [parallel-1] DEBUG toby.live.FluxScEx  - onNext::9

 

이를 reactor stream 없이 구현한다면?

newSingleThreadScheduledExecutor.scheduleAtFixedRate 이란 함수와 cancel을 복합 구현하여 만들면 된다.

public static void main(String[] args) {
    Flow.Publisher<Integer> pub = sub -> {
        sub.onSubscribe(new Flow.Subscription() {
            int no = 0;
            boolean cancelled = false;

            @Override
            public void request(long n) {
                ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();

                exec.scheduleAtFixedRate(() -> {  //interval
                    if(cancelled){
                        exec.shutdown();
                        return;
                    }
                    sub.onNext(no++);
                }, 0, 300, TimeUnit.MILLISECONDS); //죽을 때 까지 계속 저 주기로 날려
            }

            @Override
            public void cancel() {
                log.debug("cancel called");
                cancelled = true;
            }
        });
    };

    Flow.Publisher<Integer> takePub = sub -> {
        pub.subscribe(new Flow.Subscriber<Integer>() {
            int count = 0;
            Flow.Subscription subs;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                sub.onSubscribe(subscription);
                subs = subscription;
            }

            @Override
            public void onNext(Integer item) {
                sub.onNext(item);
                if(++count >= 5){
                    subs.cancel();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                sub.onError(throwable);
            }

            @Override
            public void onComplete() {
                sub.onComplete();
            }
        });
    };

    takePub.subscribe(new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            log.debug("onSubscribe");
            subscription.request(Long.MAX_VALUE); //다 보내
        }

        @Override
        public void onNext(Integer item) {
            log.debug("onNext:{}", item);
        }

        @Override
        public void onError(Throwable throwable) {
            log.debug("onError:{}", throwable);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    });
}
0    [main] DEBUG toby.live.IntervalEx  - onSubscribe
8    [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:0
309  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:1
607  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:2
908  [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:3
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx  - onNext:4
1206 [pool-1-thread-1] DEBUG toby.live.IntervalEx  - cancel called
728x90
반응형
반응형

 

nginx 구성

#기본 경로
/etc/nginx
#메인 설정
/etc/nginx/nginx.conf
	    include /etc/nginx/conf.d/*.conf;

#설정 값 경로 -> alphabetical order로 설정파일 읽음
/etc/nginx/conf.d

#nginx -T 로 설정값 읽는 순서 파악 가능

 

reverse proxy로서의 역할

  • server side proxy; client와 백엔드 사이에서 연결 중재
  • server블락 안의 proxy_pass 지시문 
  • 연결이 끊어지기 전에 연결을 닫고, 백엔드에 대한 새 연결을 하는데, 도중에 original 정보 손실 가능성 있음
  • 클라이언트 정보(ip, request, etc) 캡처 후 연결 종료
  • 백엔드로 새 연결 시 기존 정보를 헤더(proxy_set_header)에 담아서 전달
  • 웹서버 역할도 하고 애플리케이션을 호스팅도 하면서 프록시역할도 함

 

location / {
    proxy_pass http://127.0.0.1:8080;
}

이것이 기본적인 구문이다. 즉, 현재 서버에 / 로 시작하는 path로 접근하면, http://127.0.0.1:8080 으로 return 한다는 의미이다. 그러면 실제 사용자는 8080 포트에 접근하지 않았지만 8080 포트에 접근한 것과 동일한 효과가 발생한다.

nginx는 클라이언트가 접근한 path를 보고, 가장 적합한 location의 블럭으로 요청을 보내서 처리하게 된다. 주소에 대한 규칙 및 우선순위는 다음과 같다.

  1. = : 정규식이 정확하게 일치
    • location = /admin { . . . }
  2. ^~ : 정규식 앞부분이 일치
    • location ^~ /admin { . . . }
  3. ~ : 정규식 대/소문자 일치
    • location ~ /admin/ { . . . }
  4. ~* : 대/소문자를 구분하지 않고 일치
    • location ~* .(jpg|png|gif)
  5. /  : 하위 일치(prefix match)
    • location /admin { . . . }

 

예시는 아래 링크로 첨부한다.

https://www.digitalocean.com/community/tutorials/understanding-nginx-server-and-location-block-selection-algorithms

 

Understanding Nginx Server and Location Block Selection Algorithms | DigitalOcean

 

www.digitalocean.com


default.conf수정

새로운 로그파일에 해더 찍도록 설정

초기 default.conf

 

수정 default.conf

sudo nginx -s reload

conf파일을 수정한 경우 위 명령어로 반드시 리로드를 해줘야 한다.


참고: 원문 https://docs.nginx.com/nginx/admin-guide/web-server/reverse-proxy/

 

NGINX Reverse Proxy | NGINX Plus

NGINX Reverse Proxy Configure NGINX as a reverse proxy for HTTP and other protocols, with support for modifying request headers and fine-tuned buffering of responses. This article describes the basic configuration of a proxy server. You will learn how to p

docs.nginx.com

 

728x90
반응형

'서버 세팅 & tool > nginx' 카테고리의 다른 글

[이슈해결][apache] 304 NOT_MODIFIED  (0) 2023.10.12
[nginx] WAF  (0) 2022.03.30
[nginx] API gateway  (0) 2022.03.14
[nginx] 실전 cors 해결하기  (0) 2022.03.14
[nginx] load balancing  (0) 2022.03.11

+ Recent posts