반응형

2024.02.04 - [서버 세팅 & tool/kafka] - [windows] kakfa connect; mariadb 설치

사용하게 된 배경: 기존의 서버가 h2 디비를 사용하여 1 인스턴스 - 1 디비를 사용하고 있었는데, 이렇게 되면 여러 인스턴스를 띄울 때 디비가 분리되어 있어서 디비 싱크가 안 맞는 문제가 발생. 그래서 그 h2 디비를 사용하지 않고 kafka source/sink connect를 이용하여 kafka에 연결된 maria 디비(단일 디비)로 사용하려는 것.

 

connect source -> connect sink 연동하기 (db to db)

kafka source connect 추가; 마리아디비 사용

connect source는 8083 포트 사용

echo '{
"name" : "my-source-connect",
"config" : {
	"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
	"connection.url":"jdbc:mariadb://localhost:3306/mydb",
	"connection.user":"root",
	"connection.password":"1234",
	"mode": "incrementing",   //id 자동 증가
	"incrementing.column.name" : "id",
	"table.whitelist":"users",  // 변경사항 감지할 테이블
	"topic.prefix" : "my_topic_", //저장할 곳의 prefix
	"tasks.max" : "1"
	}
}

' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

데이터 전달 후 

상세를 확인해 봤는데.. 에러가 났었음

kafka connect 쪽 로그를 보니 같은 테이블 명이 여러 개라고..

 다른 데이터베이스에 있더라도 같은 이름의 테이블이면 안되는 듯..

테이블명을 바꾸거나 아래처럼 수정

다시 상태 확인 시 정상

 

이제 users 테이블에 데이터를 넣어주면 kafka에 해당 topic이 생긴 것을 확인할 수 있다.

consumer 도 확인해 보면 아래처럼 하나 추가된 것 확인

이 상태에서 insert 한번 더 해주면 consumer에 바로 다음 데이터가 추가되는 것을 볼 수 있다.

이 json을 포매터로 확인해 보면 다음과 같다.

fields에 필드에 대한 설명이 있고 payload에 실제 데이터가 들어있다. 나중에 topic을 이용하여 디비에 데이터를 저장하려고 하면 아래 포맷처럼 전달하여야 한다.

kafka sink connect 추가

echo '{
"name":"my-sink-connect",
"config":{
	"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
	"connection.url":"jdbc:mariadb://localhost:3306/mydb",
	"connection.user":"root",
	"connection.password":"1234",
	"auto.create":"true",  //디비와 연결하는데, 토픽과 같은 테이블 쓰겠다는 의미
	"auto.evolve":"true",
	"delete.enabled":"false",
	"tasks.max":"1",
	"topics":"my_topic_users"  //여기랑 연결
	}
}

'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

sink connect 생성 확인

이 상태에서 디비에 my_topic_users라는 sink table이 잘 생성되었는지 확인

이후에 users에 데이터를 입력하면 자동으로 my_topic_users에도 똑같이 쌓이는 것 확인

consumer 로그에도 해당 로그 잘 쌓임

디비에 직접 데이터를 추가하는 것 말고도 producer에 위 포맷으로 데이터를 전송하면 동일한 효과를 낼 수 있다.

이후에 users 테이블을 확인하면 해당 데이터는 없고, my_topic_users 테이블 확인하면 있는 것을 확인.

users(source) -> my_topic_users(sink) 테이블의 방향으로 연결되어 있는데, 이번에는 producer를 통해 직접 my_topic_users에 넣어줬으니 users 테이블에는 없는 것.

728x90
반응형

'서버 세팅 & tool > kafka' 카테고리의 다른 글

[windows] kakfa connect; mariadb 설치  (0) 2024.02.04
[windows] kafka 실행  (0) 2024.02.04

+ Recent posts