개발/kafka
[spring-kafka] producer, consumer 기초
방푸린
2024. 2. 8. 13:13
반응형
환경: springboot2.7.6, java11, h2 연결
producer 정보 보내는 쪽
1. pom.xml 추가
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. producer kafka 연결 설정
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
3. 카프카를 사용하여 전송하려는 메세지 함수 설정
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
//tojson
public OrderDto send(String kafkaTopic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
String jsonString = "";
try{
jsonString = mapper.writeValueAsString(orderDto);
}catch (JsonProcessingException e){
e.printStackTrace();
}
kafkaTemplate.send(kafkaTopic, jsonString);
log.info("kafka producer sent: {}" , orderDto);
return orderDto;
}
}
consumer 정보 받는
1. 동일
2. consumer kafka 연결 설정
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");//consumer grouping
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
3. 받는 listener 설정
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository catalogRepository;
@KafkaListener(topics = "example-catalog-topic") //데이터가 전달되면 가져와서 실행
public void updateQty(String kafkaMessage){
log.info("kafka message: {}", kafkaMessage);
Map<String, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<String, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
Catalog catalog = catalogRepository.findByProductId((String)map.get("productId"));
if(catalog != null){
catalog.setStock(catalog.getStock() - (Integer)map.get("qty"));
catalogRepository.save(catalog);
}
}
}
728x90
반응형