作者简介
钟硕,现供职于迪卡侬,PostgreSQL & Oracle DBA
背景
一年前,我曾经发文介绍过PostgreSQL的Decoding工具wal2json。其目的在于将PG的WAL中的改变记录(主要指DML涉及的数据变更信息)转化为可读信息,提供给运维人员做后续数据的处理。文章中虽提及Debezium项目,但并没有介绍Debezium项目实践的内容。
Debezium是基于Apache Kafka 项目建立,并为 Kafka Connect 提供兼容性插件,用于管理数据库系统,将数据库日志中的事件记录转化为事件流,并将这些改变的事件流记录到Kafka中。当应用(数据库)出现不可期的异常中断时,作为数据的消费者扔可以借用kafka记录的改变事件流对数据进行准确和完整的数据处理。
容器化部署Kafka Connector
为了简化文档的可读性,我这里使用容器化方式部署,到达快速实践的目的,更关注Zookeeper,Kafka或者正在使用非容器化部署方式的童鞋,可以在理解该解决方案后做相应的配置调整。
部署Kafka Connector前的准备工作
● Zookeeper
● Kafka
● Kafka connector
注意:文档中没有把容器中的配置文件映射出来,Kafka connector 容器化安装后需要增加connector plugin和修该配置文件,需要映射到宿主机中的目录。-v /opt/kafka/logs:/kafka/logs /opt/kafka/data:/kafka/data /opt/kafka/config:/kafka/config /opt/kafka/connect:/kafka/connect
connector中配置文件/kafka/config/connect-distributed.properties中默认plugin.path=/kafka/connect
● MySQL & PostgreSQL
这里是用Docker的方式安装样板数据库,方便做快速的验证。可以从样本里面看到各数据库的配置内容(如 my.cnf和postgresql.conf)。如下PG数据库中解码插件等相关配置:
shared_preload_libraries = 'decoderbufs,wal2json'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
部署Debezium Connector插件
下载 Debezium Connector Plugin
解压到映射的插件目录/opt/kafka/connect下,检查是否被加载到容器中,以PostgreSQL为例:
# tar zxvf debezium-connector-postgres-1.3.0.Final-plugin.tar.gz -C /opt/kafka/connect/
检查Kafka Container中插件的情况
# docker ps -f name='debezium.*connect' -l
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
73dd73bd6872 debezium/connect "/.r/r /docker-ent..." 42 hours ago Up 42 hours r-debezium-connector-kafka-connector-1-a3726ec4
docker exec -it 73dd73bd6872 /bin/bash
[kafka@73dd73bd6872 connect]$ ls
debezium-connector-mysql debezium-connector-postgres
[kafka@73dd73bd6872 connect]$ pwd
/kafka/connect
验证Debezium PostgreSQL connector加载的信息
# docker logs -t -f 73dd73bd6872 | more
2020-10-20T07:56:08.452286434Z --- Setting property from CONNECT_PLUGIN_PATH: plugin.path=/kafka/connect
2020-10-20T07:56:09.896637652Z 2020-10-20 07:56:09,889 - INFO [main:DelegatingClassLoader@246] - Loading plugin from: /kafka/connect/debezium-connector-postgres
2020-10-20T07:56:10.574470660Z 2020-10-20 07:56:10,569 - INFO [main:DelegatingClassLoader@269] - Registered loader: PluginClassLoader{pl
uginLocation=file:/kafka/connect/debezium-connector-postgres/}
2020-10-20T07:56:10.574502062Z 2020-10-20 07:56:10,569 - INFO [main:DelegatingClassLoader@198] - Added plugin 'io.debezium.connector.pos
tgresql.PostgresConnector'
注册PG连接信息到connector
关于配置项(更多参考配置选项文档)
# vim register-postgresql.json
{
"name": "fulfillment-connector", ①
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", ②
"tasks.max": "1", ③
"plugin.name": "wal2json_streaming", ④
"database.hostname": "postgresql", ⑤
"database.port": "5432", ⑥
"database.user": "postgres", ⑦
"database.password": "debezium", ⑧
"database.dbname" : "postgres", ⑨
"database.server.name": "fulfillment", ⑩
"schema.include.list": "inventory" ⑪
}
}
① name:注册到connector服务中的名称。
② connector.class:连接器类型,plugin正确安装的前提下,可以通过下面的方式获取到class的内容。
curl -H "Accept:application/json" -sS localhost:8083/connector-plugins
③ tasks.max:每个connector预启动的最大的任务数,PG总是使用一个任务,因此不是一个必要的参数。
④ plugin.name:这个是在PG中的shared_preload_libraries里面配置的Decoding的插件(decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput)。使用wal2json时,事务比较大情况下(1GB),解码出来的Json中包含所有的事务流,被buffer到内存会影响数据处理的效率。因此,这种情况下配置wal2json_streaming,将一个事务中的每次改变分成一个消息条目进行流式处理。
⑤,⑥,⑦,⑧,⑨ database.[ hostname| port | user | password | dbname ]: DB的连接信息。因为,在connector服务中做了链路 --link postgresql:postgresql。所以这里PG容器的主机名称是postgresql。
⑩ database.server.name:是在Kafka的Topics的昵称。
⑪ schema.include.list:设定要消费的schema名,多个schema名用逗号分隔,默认为非系统的schema。
启动注册到Debezium PostgreSQL Connector中的连接服务
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-postgresql.json
验证注册的信息
curl -H "Accept:application/json" localhost:8083/connectors/
["fulfillment-connector"]
同样的在kafka connector容器中应该也能看到如下信息:
2020-10-21T03:49:22.632581000Z 2020-10-21 03:49:22,632 - INFO [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - TaskConfig values:
2020-10-21T03:49:22.632592801Z task.class = class io.debezium.connector.postgresql.PostgresConnectorTask
2020-10-21T03:49:22.632596701Z
2020-10-21T03:49:22.632948722Z 2020-10-21 03:49:22,632 - INFO [StartAndStopExecutor-connect-1-4:Worker@524] - Instantiated task fulfillm
ent-connector-0 with version 1.3.0.Final of type io.debezium.connector.postgresql.PostgresConnectorTask
2020-10-21T03:49:22.633286743Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - JsonConverterConfi
g values:
2020-10-21T03:49:22.634988447Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:Worker@543] - Set up the value converter
class org.apache.kafka.connect.json.JsonConverter for task fulfillment-connector-0 using the worker config
2020-10-21T03:49:22.634994347Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:Worker@550] - Set up the header converte
r class org.apache.kafka.connect.storage.SimpleHeaderConverter for task fulfillment-connector-0 using the worker config
停止并删除注册的连接
curl -X DELETE localhost:8083/connectors/fulfillment-connector
删除数据库中的slot,否则下次再启动连接时,会报slot已经存在的错误(连接配置中默认配hislot.name为debezium的插槽)
# select pg_drop_replication_slot('debezium')
docker exec -it a5d73008228a /bin/bash
root@a5d73008228a:/# psql -d postgres -U postgres
postgres=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+----------
slot_name | debezium
plugin | wal2json
slot_type | logical
datoid | 13067
database | postgres
temporary | f
active | t
active_pid | 1681
xmin |
catalog_xmin | 605
restart_lsn | 0/209B200
confirmed_flush_lsn | 0/209B238
postgres=# set search_path to inventory ;
SET
postgres=# \dt
List of relations
Schema | Name | Type | Owner
-----------+------------------+-------+----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
postgres=# \d customers
Table "inventory.customers"
Column | Type | Collation | Nullable | Default
------------+------------------------+-----------+----------+---------------------------------------
id | integer | | not null | nextval('customers_id_seq'::regclass)
first_name | character varying(255) | | not null |
last_name | character varying(255) | | not null |
email | character varying(255) | | not null |
通过Kafka HQ或者类似的工具查看数据库生产的信息
Topic: fulfillment.inventory.customers
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
…
"payload": {
"before": null,
"after": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "sally.thomas@acme.com"
},
"source": {
"version": "1.3.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1603252163493,
"snapshot": "true",
"db": "postgres",
"schema": "inventory",
"table": "customers",
"txId": 602,
"lsn": 34078720,
"xmin": null
},
"op": "r",
"ts_ms": 1603252163495,
"transaction": null
}
}
参考:
Debezium tutorial
Debezium
Confluent for debezium-connect-postgres

榜样8观后感:https://www.nanss.com/gongzuo/20347.html 晋祠简介:https://www.nanss.com/gongzuo/20636.html 四时田园杂兴其三十一改写小短文:https://www.nanss.com/xuexi/20587.html 吃螃蟹后不能吃什么:https://www.nanss.com/yinshi/19394.html 公司食堂管理制度:https://www.nanss.com/gongzuo/20595.html 优点怎么写:https://www.nanss.com/gongzuo/18377.html 朗诵技巧:https://www.nanss.com/xuexi/19917.html 员工福利方案:https://www.nanss.com/gongzuo/19165.html 数学规划:https://www.nanss.com/gongzuo/20326.html 会计专业简历:https://www.nanss.com/gongzuo/19175.html 2024对照6个方面查摆问题:https://www.nanss.com/gongzuo/20581.html 政府采购工作总结:https://www.nanss.com/gongzuo/18457.html 宗法制的特点:https://www.nanss.com/shenghuo/18274.html 一天等于多少秒:https://www.nanss.com/shenghuo/20213.html 促销员培训:https://www.nanss.com/gongzuo/20599.html 女生冷门又高薪的职业:https://www.nanss.com/gongzuo/20680.html 开会通知:https://www.nanss.com/gongzuo/18429.html 白蚁吃什么:https://www.nanss.com/wenti/18237.html 好听的男生名字:https://www.nanss.com/mingcheng/20570.html 难忘的一件事600字:https://www.nanss.com/xuexi/20293.html 职业生涯规划书模板:https://www.nanss.com/gongzuo/20545.html 销售年度工作计划:https://www.nanss.com/gongzuo/20339.html 乡镇空缺生意:https://www.nanss.com/shenghuo/20675.html 英语六级多少分算过:https://www.nanss.com/xuexi/20209.html 如何谈恋爱:https://www.nanss.com/wenti/20457.html xswl什么意思:https://www.nanss.com/shenghuo/19961.html 新娘婚礼致辞:https://www.nanss.com/shenghuo/20472.html 高血糖一日三餐食谱:https://www.nanss.com/yinshi/19751.html 师德工作总结:https://www.nanss.com/gongzuo/20559.html 经典常谈读书笔记:https://www.nanss.com/xuexi/20242.html
大佬,想问一下postgresql wal里有记录每条操作的用户信息吗?
赞~