728x90
반응형
728x90
반응형
반응형

2024.02.04 - [서버 세팅 & tool/kafka] - [windows] kakfa connect; mariadb 설치

사용하게 된 배경: 기존의 서버가 h2 디비를 사용하여 1 인스턴스 - 1 디비를 사용하고 있었는데, 이렇게 되면 여러 인스턴스를 띄울 때 디비가 분리되어 있어서 디비 싱크가 안 맞는 문제가 발생. 그래서 그 h2 디비를 사용하지 않고 kafka source/sink connect를 이용하여 kafka에 연결된 maria 디비(단일 디비)로 사용하려는 것.

 

connect source -> connect sink 연동하기 (db to db)

kafka source connect 추가; 마리아디비 사용

connect source는 8083 포트 사용

echo '{
"name" : "my-source-connect",
"config" : {
	"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
	"connection.url":"jdbc:mariadb://localhost:3306/mydb",
	"connection.user":"root",
	"connection.password":"1234",
	"mode": "incrementing",   //id 자동 증가
	"incrementing.column.name" : "id",
	"table.whitelist":"users",  // 변경사항 감지할 테이블
	"topic.prefix" : "my_topic_", //저장할 곳의 prefix
	"tasks.max" : "1"
	}
}

' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

데이터 전달 후 

상세를 확인해 봤는데.. 에러가 났었음

kafka connect 쪽 로그를 보니 같은 테이블 명이 여러 개라고..

 다른 데이터베이스에 있더라도 같은 이름의 테이블이면 안되는 듯..

테이블명을 바꾸거나 아래처럼 수정

다시 상태 확인 시 정상

 

이제 users 테이블에 데이터를 넣어주면 kafka에 해당 topic이 생긴 것을 확인할 수 있다.

consumer 도 확인해 보면 아래처럼 하나 추가된 것 확인

이 상태에서 insert 한번 더 해주면 consumer에 바로 다음 데이터가 추가되는 것을 볼 수 있다.

이 json을 포매터로 확인해 보면 다음과 같다.

fields에 필드에 대한 설명이 있고 payload에 실제 데이터가 들어있다. 나중에 topic을 이용하여 디비에 데이터를 저장하려고 하면 아래 포맷처럼 전달하여야 한다.

kafka sink connect 추가

echo '{
"name":"my-sink-connect",
"config":{
	"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
	"connection.url":"jdbc:mariadb://localhost:3306/mydb",
	"connection.user":"root",
	"connection.password":"1234",
	"auto.create":"true",  //디비와 연결하는데, 토픽과 같은 테이블 쓰겠다는 의미
	"auto.evolve":"true",
	"delete.enabled":"false",
	"tasks.max":"1",
	"topics":"my_topic_users"  //여기랑 연결
	}
}

'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

sink connect 생성 확인

이 상태에서 디비에 my_topic_users라는 sink table이 잘 생성되었는지 확인

이후에 users에 데이터를 입력하면 자동으로 my_topic_users에도 똑같이 쌓이는 것 확인

consumer 로그에도 해당 로그 잘 쌓임

디비에 직접 데이터를 추가하는 것 말고도 producer에 위 포맷으로 데이터를 전송하면 동일한 효과를 낼 수 있다.

이후에 users 테이블을 확인하면 해당 데이터는 없고, my_topic_users 테이블 확인하면 있는 것을 확인.

users(source) -> my_topic_users(sink) 테이블의 방향으로 연결되어 있는데, 이번에는 producer를 통해 직접 my_topic_users에 넣어줬으니 users 테이블에는 없는 것.

728x90
반응형

'서버 세팅 & tool > kafka' 카테고리의 다른 글

[windows] kakfa connect; mariadb 설치  (0) 2024.02.04
[windows] kafka 실행  (0) 2024.02.04
반응형

환경: windows11, java17, springboot2.7.6

 

kakfa connect를 통해 데이터를 import/export 가능

코드 없이 configuration으로 데이터 이동 가능

Standalone mode, distribution mode 지원

  • restfult API 지원
  • stream, batch 형태로 데이터 전송 가능
  • 커스텀 connector를 통한 다양한 플러그인 제공(파일, mysql, 등)

 

테스트를 위한 mariaDB 다운로드(in windows)

참고로 마리아디비는 mysql과 코어를 같이 사용한다.

디비 생성을 위해 cmd로 접근

소스에 아래 추가

<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>3.1.4</version>
</dependency>

소스 띄우고 h2 콘솔로 아래 입력

cool


MariaDB 설치

MacOS

$ brew install mariadb

시작, 종료, 상태확인

$ mysql.server start, mysql.server stop, mysql.server status

접속

$ mysql –uroot

데이터베이스 생성

mysql> create database mydb;

Access denied 발생 시

$ sudo mysql –u root

mysql> use mysql;

mysql> select user, host, plugin FROM mysql.user;

mysql> set password for 'root'@'localhost'=password('test1357’);

mysql> flush privileges;

Windows)

다운로드

mariadb-10.5.8-winx64.zip 파일 다운로드

데이터베이스 초기화

.\bin\mariadb-install-db.exe 

    --datadir=C:\Work\mariadb-10.5.8-winx64\data 

    --service=mariaDB 

    --port=3306 

    --password=test1357


kafka connect 다운로드

http://packages.confluent.io/archive

들어가서 confluent community 7.5.3.zip 다운로드, 압축 해제

 

jdbc connector 다운로드

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

 

kafka connect 실행

에러 발생 시 아래처럼 실행 파일 수정

마리아 디비 jar 파일을 kafka connect 폴더 아래로 이동

다시 실행. 그래도 에러

해당경로에 프로퍼티 파일이 없어서 나는 에러로 bat파일을 열어 경로 수정한다.

그리고 다시 실행하면 성공.

성공 후 토픽 리스트를 다시 조회하면, kafka connect 가 필요한 토픽이 추가된 것을 확인할 수 있다.

 

728x90
반응형

'서버 세팅 & tool > kafka' 카테고리의 다른 글

[windows] kakfa connect 연동  (0) 2024.02.04
[windows] kafka 실행  (0) 2024.02.04
반응형

환경: 윈도우11, kafka2.13-3.6.1

 

카프카는 공홈에서 받아주고, 압축을 풀어준다.

카프카/주키퍼 실행 시 아래와 같은 에러 발생한다.

확인해보니 kafka-server-start.bat실행 시 같은 폴더 안에 있는 kafka-run-class.bat를 참조하여 같이 실행시키는데 여기서 너무 많은 classpath를 호출하여 문제가 생기는 것. 윈도우 cmd는 명령줄에 8192글자까지 제한된다고 함..

 

해결방법은 보통 아래와 같이 두가지 방법이 있는 것 같은데,  실행 파일을 수정해 봤는데도 에러가 났음..

  1. kafka 폴더를 C드라이브 바로 밑으로 옮긴후에 실행 + 폴더명 변경
  2. 실행파일 수정

https://stackoverflow.com/questions/48834927/the-input-line-is-too-long-when-starting-kafka/50168530#50168530

 

The input line is too long when starting kafka

I'm trying to run Kafka message queue on Windows. I'm usin this tutorial - https://dzone.com/articles/running-apache-kafka-on-windows-os When i try to run it with comand - .\bin\windows\kafka-ser...

stackoverflow.com

 

그래서 우선 폴더를 C > Program files 아래에 두고, 파일 이름까지 바꿨는데도 또 에러 발생.

진짜 개복치

그래서 C > windows 아래로 옮겼더니 된다.. 진짜 말이 되나

주키퍼 포트: 2181

카프카 포트: 9092

 

 

카프카가 제공하는 샘플 데이터 producer/consumer

토픽에 데이터를 쌓아두면 그걸 구독하는 컨수머가 가져가는 방식

9092 카프카 서버에 토픽 생성; 멀티 클러스터링 시 파티션 값 사용 

windows의 경우 아래 명령어의 .sh 대신 .bat로 바꿔서 실행해야 함!

메시지 생산

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events

메시지 소비

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

 

실제 실행 시..

728x90
반응형

'서버 세팅 & tool > kafka' 카테고리의 다른 글

[windows] kakfa connect 연동  (0) 2024.02.04
[windows] kakfa connect; mariadb 설치  (0) 2024.02.04

+ Recent posts