https://debezium.io/documentation/reference/1.4/tutorial.html
# zookeeper
docker run -dit --name zookeeper \
-p 2181:2181 \
-p 2888:2888 \
-p 3888:3888 \
debezium/zookeeper:1.4
# kafka
docker run -dit --name kafka \
-p 9092:9092 \
--link zookeeper:zookeeper \
debezium/kafka:1.4
# mysql db
docker run -dit --name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
debezium/example-mysql:1.4
# kafka connect
docker run -dit --name connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
debezium/connect:1.4
# mysql command line
docker run -it --rm --name mysqlterm \
--link mysql \
--rm mysql:5.7 sh -c '\
exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR"
-P"$MYSQL_PORT_3306_TCP_PORT"
-uroot
-p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne Marie | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
6 rows in set (0.04 sec)
Register a connector to monitor the inventory database
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}'
curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]
viewing a create event
docker run -it --rm --name watcher \
--link zookeeper:zookeeper \
--link kafka:kafka \
debezium/kafka:1.4 \
watch-topic -a -k dbserver1.inventory.customers
Utils
show topics
docker exec -it kafka /kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
show databases
docker exec -it postgres bash -c 'psql -U postgres inventory -c "\l;"'
show tables
docker exec -it postgres bash -c 'psql -U postgres inventory -c "\dt;"'
Complement
ingest to postgres
https://debezium.io/blog/2017/09/25/streaming-to-another-database/
docker run -d --name postgres \
-e POSTGRES_PASSWORD=password \
-p 5432:5432 \
postgres
# kafka connect 2
docker run -dit --name connect2 \
-p 8084:8083 \
-e GROUP_ID=2 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs2 \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets2 \
-e STATUS_STORAGE_TOPIC=my_connect_statuses2 \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link postgres:postgres \
debezium/connect-jdbc-postgres
# docker exec -it kafka /kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
__consumer_offsets
dbhistory.inventory
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
my_connect_configs
my_connect_configs2
my_connect_offsets
my_connect_offsets2
my_connect_statuses
my_connect_statuses2
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8084/connectors/ -d '{
"name": "jdbc-sink2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.inventory.customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgres&password=password",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}'
HTTP/1.1 201 Created
Date: Thu, 18 Mar 2021 19:21:14 GMT
Location: http://localhost:8084/connectors/jdbc-sink
Content-Type: application/json
Content-Length: 446
Server: Jetty(9.4.20.v20190813)
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "dbserver1",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgres&password=password",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"name": "jdbc-sink"
},
"tasks": [],
"type": "sink"
}
# mysql db
docker run -dit --name server1 \
-p 3307:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
debezium/example-mysql:1.4
# kafka connect
docker run -dit --name connect3 \
-p 8085:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs3 \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets3 \
-e STATUS_STORAGE_TOPIC=my_connect_statuses3 \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link server1:server1 \
debezium/connect:1.4
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8085/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "server1",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}'
Debezium UI
dummy db https://www.postgresqltutorial.com/postgresql-sample-database/
docker rm -f pg
docker run -dit \
--name pg \
-e POSTGRES_PASSWORD=admin \
-p 5432:5432 \
-v /root/data:/data \
postgres:10.16 postgres -c wal_level=logical
docker exec -it pg bash -c "psql -U postgres -c 'create database dvdrental'"
cmd="pg_restore -U postgres -d dvdrental /data/dvdrental.tar"
docker exec -it pg bash -c "$cmd"