728x90
반응형
728x90
반응형
반응형

이전 작업:

2024.02.04 - [서버 세팅 & tool/kafka] - [windows] kakfa connect 연동

2024.02.08 - [개발/kafka] - [spring-kafka] producer, consumer 기초

환경: springboot 2.7.6, spring-kafka, java11

목표: 인스턴스 별로 하나씩 있던 디비를 공용 maria 디비로 전환, kafka connect 이용하여 source 쐈을 때 sink로 받아서 db에 저장

 

1. maria db 접속, 필요한 테이블 생성

2. pom.xml, application.yml 수정

<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>3.1.4</version>
</dependency>

3. kafka producer 추가

  • orders 이라는 sink connect로 전송
  • db에 데이터를 저장하는 것이라 kafka가 원하는 db 포맷으로 만들어야 함
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
    private final KafkaTemplate kafkaTemplate;

    private final List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
            new Field("string", true, "user_id"),
            new Field("string", true, "product_id"),
            new Field("int32", true, "qty"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price"));

    private final  Schema schema = Schema.builder().type("struct").fields(fields).optional(false).name("orders").build();

    public KafkaOrderDto send(String topic, OrderDto orderDto){

        Payload payload = Payload.builder().orderId(orderDto.getOrderId()).userId(orderDto.getUserId()).productId(orderDto.getProductId()).qty(orderDto.getQty()).unitPrice(orderDto.getUnitPrice()).totalPrice(orderDto.getTotalPrice()).build();
        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonString = "";
        try{
            jsonString = mapper.writeValueAsString(kafkaOrderDto);
        }catch (JsonProcessingException e){
            e.printStackTrace();
        }
        kafkaTemplate.send(topic, jsonString);
        log.info("kafka producer sent: {}" , kafkaOrderDto);
        return kafkaOrderDto;
    }
}

 

호출부, 여기서 앞부분이 sink topic이름

orderProducer.send("orders", orderDto);

4. kafka sink connector 추가

마리아 디비랑 연결되는 토픽이 orders 라고 정의하고 등록하는 것임

잘 생성되었나 확인

 

5. 이렇게 되면 해당 인스턴스가 여러가 떠 있어도 어떤 인스턴스가 디비 수정 건을 받았건 상관없이 모두 마리아 디비로 들어가서 단일로 관리가 가능

 

728x90
반응형

'개발 > kafka' 카테고리의 다른 글

[spring-kafka] producer, consumer 기초  (0) 2024.02.08
반응형

환경: springboot2.7.6, java11, h2 연결 

 

producer 정보 보내는 쪽

1. pom.xml 추가

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

2. producer kafka 연결 설정

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> properties = new HashMap<>();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

 

3. 카프카를 사용하여 전송하려는 메세지 함수 설정

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    //tojson
    public OrderDto send(String kafkaTopic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        String jsonString = "";
        try{
            jsonString = mapper.writeValueAsString(orderDto);
        }catch (JsonProcessingException e){
            e.printStackTrace();
        }
        kafkaTemplate.send(kafkaTopic, jsonString);
        log.info("kafka producer sent: {}" , orderDto);
        return orderDto;
    }
}

 

 

consumer 정보 받는 

1. 동일

2. consumer kafka 연결 설정

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        Map<String, Object> properties = new HashMap<>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");//consumer grouping
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}

 

3. 받는 listener 설정

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
    private final CatalogRepository catalogRepository;

    @KafkaListener(topics = "example-catalog-topic") //데이터가 전달되면 가져와서 실행
    public void updateQty(String kafkaMessage){
        log.info("kafka message: {}", kafkaMessage);
        Map<String, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<String, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        Catalog catalog = catalogRepository.findByProductId((String)map.get("productId"));
        if(catalog != null){
            catalog.setStock(catalog.getStock() - (Integer)map.get("qty"));
            catalogRepository.save(catalog);
        }
    }
}
728x90
반응형

+ Recent posts