作者简介

钟硕,现供职于迪卡侬,PostgreSQL & Oracle DBA

背景

一年前,我曾经发文介绍过PostgreSQL的Decoding工具wal2json。其目的在于将PG的WAL中的改变记录(主要指DML涉及的数据变更信息)转化为可读信息,提供给运维人员做后续数据的处理。文章中虽提及Debezium项目,但并没有介绍Debezium项目实践的内容。

Debezium是基于Apache Kafka 项目建立,并为 Kafka Connect 提供兼容性插件,用于管理数据库系统,将数据库日志中的事件记录转化为事件流,并将这些改变的事件流记录到Kafka中。当应用(数据库)出现不可期的异常中断时,作为数据的消费者扔可以借用kafka记录的改变事件流对数据进行准确和完整的数据处理。

容器化部署Kafka Connector

为了简化文档的可读性,我这里使用容器化方式部署,到达快速实践的目的,更关注Zookeeper,Kafka或者正在使用非容器化部署方式的童鞋,可以在理解该解决方案后做相应的配置调整。

部署Kafka Connector前的准备工作

● Zookeeper

● Kafka

● Kafka connector

注意:文档中没有把容器中的配置文件映射出来,Kafka connector 容器化安装后需要增加connector plugin和修该配置文件,需要映射到宿主机中的目录。-v /opt/kafka/logs:/kafka/logs /opt/kafka/data:/kafka/data /opt/kafka/config:/kafka/config /opt/kafka/connect:/kafka/connect

connector中配置文件/kafka/config/connect-distributed.properties中默认plugin.path=/kafka/connect

● MySQL & PostgreSQL

这里是用Docker的方式安装样板数据库,方便做快速的验证。可以从样本里面看到各数据库的配置内容(如 my.cnf和postgresql.conf)。如下PG数据库中解码插件等相关配置:

shared_preload_libraries = 'decoderbufs,wal2json'

wal_level = logical

max_wal_senders = 4

max_replication_slots = 4

部署Debezium Connector插件

下载 Debezium Connector Plugin

解压到映射的插件目录/opt/kafka/connect下,检查是否被加载到容器中,以PostgreSQL为例:

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.0.Final/debezium-connector-postgres-1.3.0.Final-plugin.tar.gz

# tar zxvf debezium-connector-postgres-1.3.0.Final-plugin.tar.gz -C /opt/kafka/connect/

检查Kafka Container中插件的情况

# docker ps -f name='debezium.*connect' -l

CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS               NAMES
73dd73bd6872        debezium/connect    "/.r/r /docker-ent..."   42 hours ago        Up 42 hours                             r-debezium-connector-kafka-connector-1-a3726ec4

docker exec -it 73dd73bd6872 /bin/bash

[kafka@73dd73bd6872 connect]$ ls
debezium-connector-mysql  debezium-connector-postgres
[kafka@73dd73bd6872 connect]$ pwd
/kafka/connect

验证Debezium PostgreSQL connector加载的信息

# docker logs -t -f 73dd73bd6872 | more

2020-10-20T07:56:08.452286434Z --- Setting property from CONNECT_PLUGIN_PATH: plugin.path=/kafka/connect
2020-10-20T07:56:09.896637652Z 2020-10-20 07:56:09,889 - INFO  [main:DelegatingClassLoader@246] - Loading plugin from: /kafka/connect/debezium-connector-postgres
2020-10-20T07:56:10.574470660Z 2020-10-20 07:56:10,569 - INFO  [main:DelegatingClassLoader@269] - Registered loader: PluginClassLoader{pl
uginLocation=file:/kafka/connect/debezium-connector-postgres/}
2020-10-20T07:56:10.574502062Z 2020-10-20 07:56:10,569 - INFO  [main:DelegatingClassLoader@198] - Added plugin 'io.debezium.connector.pos
tgresql.PostgresConnector'

注册PG连接信息到connector

关于配置项(更多参考配置选项文档)

# vim register-postgresql.json

{
  "name": "fulfillment-connector", ①
  "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector", ②
      "tasks.max": "1", ③
      "plugin.name": "wal2json_streaming", ④
      "database.hostname": "postgresql", ⑤
      "database.port": "5432", ⑥
      "database.user": "postgres", ⑦
      "database.password": "debezium", ⑧
      "database.dbname" : "postgres", ⑨
      "database.server.name": "fulfillment", ⑩
      "schema.include.list": "inventory" ⑪
      }
 }

① name:注册到connector服务中的名称。

② connector.class:连接器类型,plugin正确安装的前提下,可以通过下面的方式获取到class的内容。

curl -H "Accept:application/json" -sS localhost:8083/connector-plugins

③ tasks.max:每个connector预启动的最大的任务数,PG总是使用一个任务,因此不是一个必要的参数。

④ plugin.name:这个是在PG中的shared_preload_libraries里面配置的Decoding的插件(decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput)。使用wal2json时,事务比较大情况下(1GB),解码出来的Json中包含所有的事务流,被buffer到内存会影响数据处理的效率。因此,这种情况下配置wal2json_streaming,将一个事务中的每次改变分成一个消息条目进行流式处理。

⑤,⑥,⑦,⑧,⑨ database.[ hostname| port | user | password | dbname ]: DB的连接信息。因为,在connector服务中做了链路 --link postgresql:postgresql。所以这里PG容器的主机名称是postgresql。

⑩ database.server.name:是在Kafka的Topics的昵称。

⑪ schema.include.list:设定要消费的schema名,多个schema名用逗号分隔,默认为非系统的schema。

启动注册到Debezium PostgreSQL Connector中的连接服务

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-postgresql.json

验证注册的信息

curl -H "Accept:application/json" localhost:8083/connectors/

["fulfillment-connector"]

同样的在kafka connector容器中应该也能看到如下信息:

2020-10-21T03:49:22.632581000Z 2020-10-21 03:49:22,632 - INFO  [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - TaskConfig values:
2020-10-21T03:49:22.632592801Z  task.class = class io.debezium.connector.postgresql.PostgresConnectorTask
2020-10-21T03:49:22.632596701Z
2020-10-21T03:49:22.632948722Z 2020-10-21 03:49:22,632 - INFO  [StartAndStopExecutor-connect-1-4:Worker@524] - Instantiated task fulfillm
ent-connector-0 with version 1.3.0.Final of type io.debezium.connector.postgresql.PostgresConnectorTask
2020-10-21T03:49:22.633286743Z 2020-10-21 03:49:22,633 - INFO  [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - JsonConverterConfi
g values:
2020-10-21T03:49:22.634988447Z 2020-10-21 03:49:22,633 - INFO  [StartAndStopExecutor-connect-1-4:Worker@543] - Set up the value converter
 class org.apache.kafka.connect.json.JsonConverter for task fulfillment-connector-0 using the worker config
2020-10-21T03:49:22.634994347Z 2020-10-21 03:49:22,633 - INFO  [StartAndStopExecutor-connect-1-4:Worker@550] - Set up the header converte
r class org.apache.kafka.connect.storage.SimpleHeaderConverter for task fulfillment-connector-0 using the worker config

停止并删除注册的连接

curl -X DELETE localhost:8083/connectors/fulfillment-connector

删除数据库中的slot,否则下次再启动连接时,会报slot已经存在的错误(连接配置中默认配hislot.name为debezium的插槽)

# select pg_drop_replication_slot('debezium')

docker exec -it a5d73008228a /bin/bash
root@a5d73008228a:/# psql -d postgres -U postgres
postgres=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+----------
slot_name           | debezium
plugin                   | wal2json
slot_type             | logical
datoid                 | 13067
database            | postgres
temporary           | f
active                  | t
active_pid           | 1681
xmin                    | 
catalog_xmin      | 605
restart_lsn          | 0/209B200
confirmed_flush_lsn | 0/209B238

postgres=# set search_path to inventory ;
SET

postgres=# \dt
                List of relations
  Schema   |       Name       | Type  |  Owner   
-----------+------------------+-------+----------
 inventory | customers        | table | postgres
 inventory | geom             | table | postgres
 inventory | orders           | table | postgres
 inventory | products         | table | postgres
 inventory | products_on_hand | table | postgres
 inventory | spatial_ref_sys  | table | postgres
(6 rows)

postgres=# \d customers
                                    Table "inventory.customers"
   Column   |          Type          | Collation | Nullable |                Default                
------------+------------------------+-----------+----------+---------------------------------------
 id         | integer                |           | not null | nextval('customers_id_seq'::regclass)
 first_name | character varying(255) |           | not null | 
 last_name  | character varying(255) |           | not null | 
 email      | character varying(255) |           | not null |

通过Kafka HQ或者类似的工具查看数据库生产的信息

Topic: fulfillment.inventory.customers
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          …
  "payload": {
    "before": null,
    "after": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "fulfillment",
      "ts_ms": 1603252163493,
      "snapshot": "true",
      "db": "postgres",
      "schema": "inventory",
      "table": "customers",
      "txId": 602,
      "lsn": 34078720,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1603252163495,
    "transaction": null
  }
}

参考:

Debezium tutorial

https://debezium.io/documentation/reference/tutorial.html

Debezium

Confluent for debezium-connect-postgres

https://docs.confluent.io/current/connect/debezium-connect-postgres/index.html

CENTER_PostgreSQL_Community

请在登录后发表评论,否则无法保存。
1楼 xiaowu
2024-04-24 10:49:59+08

榜样8观后感:https://www.nanss.com/gongzuo/20347.html 晋祠简介:https://www.nanss.com/gongzuo/20636.html 四时田园杂兴其三十一改写小短文:https://www.nanss.com/xuexi/20587.html 吃螃蟹后不能吃什么:https://www.nanss.com/yinshi/19394.html 公司食堂管理制度:https://www.nanss.com/gongzuo/20595.html 优点怎么写:https://www.nanss.com/gongzuo/18377.html 朗诵技巧:https://www.nanss.com/xuexi/19917.html 员工福利方案:https://www.nanss.com/gongzuo/19165.html 数学规划:https://www.nanss.com/gongzuo/20326.html 会计专业简历:https://www.nanss.com/gongzuo/19175.html 2024对照6个方面查摆问题:https://www.nanss.com/gongzuo/20581.html 政府采购工作总结:https://www.nanss.com/gongzuo/18457.html 宗法制的特点:https://www.nanss.com/shenghuo/18274.html 一天等于多少秒:https://www.nanss.com/shenghuo/20213.html 促销员培训:https://www.nanss.com/gongzuo/20599.html 女生冷门又高薪的职业:https://www.nanss.com/gongzuo/20680.html 开会通知:https://www.nanss.com/gongzuo/18429.html 白蚁吃什么:https://www.nanss.com/wenti/18237.html 好听的男生名字:https://www.nanss.com/mingcheng/20570.html 难忘的一件事600字:https://www.nanss.com/xuexi/20293.html 职业生涯规划书模板:https://www.nanss.com/gongzuo/20545.html 销售年度工作计划:https://www.nanss.com/gongzuo/20339.html 乡镇空缺生意:https://www.nanss.com/shenghuo/20675.html 英语六级多少分算过:https://www.nanss.com/xuexi/20209.html 如何谈恋爱:https://www.nanss.com/wenti/20457.html xswl什么意思:https://www.nanss.com/shenghuo/19961.html 新娘婚礼致辞:https://www.nanss.com/shenghuo/20472.html 高血糖一日三餐食谱:https://www.nanss.com/yinshi/19751.html 师德工作总结:https://www.nanss.com/gongzuo/20559.html 经典常谈读书笔记:https://www.nanss.com/xuexi/20242.html

2楼 1006575258
2021-07-02 09:39:58+08

大佬,想问一下postgresql wal里有记录每条操作的用户信息吗?

3楼 juju
2020-10-31 21:38:11+08

赞~

© 2010 PostgreSQL中文社区