Radiofisik

my knowledge base

Синхронизация баз Debezium

В микросервисной архитектуре часто возникает задача синхронизации баз данных например для организации поиска по сущностям разных микросервисов одновременно. Как вариант решения - проект debezium

Топология

                   +-------------+
                   |             |
                   | PostgreSQL  |
                   |             |
                   +------+------+
                          |
                          |
                          |
          +---------------v------------------+
          |                                  |
          |           Kafka Connect          |
          |  (Debezium, JDBC connectors)     |
          |                                  |
          +---------------+------------------+
                          |
                          |
                          |
                          |
                  +-------v--------+
                  |                |
                  |   PostgreSQL   |
                  |                |
                  +----------------+


Используем компоненты

Использование

Запустим синхронизацию

# Start the application
export DEBEZIUM_VERSION=1.0
docker-compose up -d

# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://docker:8083/connectors/ -d @sink.json

# Start source connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://docker:8083/connectors/ -d @source.json

в базе источнике

docker-compose exec postgressrc bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from inventory.customers"'

в базе назначения

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'

список топиков

docker-compose exec kafka bash -c 'bin/kafka-topics.sh --zookeeper zookeeper:2181 --list'

послушать изменения

docker-compose exec kafka bash -c 'bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic customers --from-beginning'

чтобы все почистить и синхронизировать снова

#delete connectors
curl -i -X DELETE -H "Accept:application/json" -H  "Content-Type:application/json" http://docker:8083/connectors/inventory-connector 
curl -i -X DELETE -H "Accept:application/json" -H  "Content-Type:application/json" http://docker:8083/connectors/jdbc-sink 

#clean destination data and replication slots
sleep 10
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "drop table customers"'

docker-compose exec postgressrc bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from pg_replication_slots "'
docker-compose exec postgressrc bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select pg_drop_replication_slot('"'debezium'"')"'

#clean topics
docker-compose exec kafka bash -c 'bin/kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic my_connect_offsets'
# docker-compose exec kafka bash -c 'bin/kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic my_connect_configs'
# docker-compose exec kafka bash -c 'bin/kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic my_connect_status'
docker-compose exec kafka bash -c 'bin/kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic customers'


sleep 5
docker-compose restart connect

sleep 10
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://docker:8083/connectors/ -d @sink.json
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://docker:8083/connectors/ -d @source.json

Репозиторий на github https://github.com/Radiofisik/DbSynk

В оригинале синхронизация базы MySQL c postgres Оригинальный вариант https://github.com/debezium/debezium-examples/tree/master/unwrap-smt