반응형
이전 작업:
2024.02.04 - [서버 세팅 & tool/kafka] - [windows] kakfa connect 연동
2024.02.08 - [개발/kafka] - [spring-kafka] producer, consumer 기초
환경: springboot 2.7.6, spring-kafka, java11
목표: 인스턴스 별로 하나씩 있던 디비를 공용 maria 디비로 전환, kafka connect 이용하여 source 쐈을 때 sink로 받아서 db에 저장
1. maria db 접속, 필요한 테이블 생성
2. pom.xml, application.yml 수정
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>3.1.4</version>
</dependency>
3. kafka producer 추가
- orders 이라는 sink connect로 전송
- db에 데이터를 저장하는 것이라 kafka가 원하는 db 포맷으로 만들어야 함
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate kafkaTemplate;
private final List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price"));
private final Schema schema = Schema.builder().type("struct").fields(fields).optional(false).name("orders").build();
public KafkaOrderDto send(String topic, OrderDto orderDto){
Payload payload = Payload.builder().orderId(orderDto.getOrderId()).userId(orderDto.getUserId()).productId(orderDto.getProductId()).qty(orderDto.getQty()).unitPrice(orderDto.getUnitPrice()).totalPrice(orderDto.getTotalPrice()).build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonString = "";
try{
jsonString = mapper.writeValueAsString(kafkaOrderDto);
}catch (JsonProcessingException e){
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonString);
log.info("kafka producer sent: {}" , kafkaOrderDto);
return kafkaOrderDto;
}
}
호출부, 여기서 앞부분이 sink topic이름
orderProducer.send("orders", orderDto);
4. kafka sink connector 추가
마리아 디비랑 연결되는 토픽이 orders 라고 정의하고 등록하는 것임
잘 생성되었나 확인
5. 이렇게 되면 해당 인스턴스가 여러가 떠 있어도 어떤 인스턴스가 디비 수정 건을 받았건 상관없이 모두 마리아 디비로 들어가서 단일로 관리가 가능
728x90
반응형
'개발 > kafka' 카테고리의 다른 글
[spring-kafka] producer, consumer 기초 (0) | 2024.02.08 |
---|