PostgreSQL 9.5.3 中文手册 | |||
---|---|---|---|
上一页 | 上一级 | 章 46. 逻辑解码 | 下一页 |
可以在 PostgreSQL 源码树的 contrib/test_decoding 子目录中找到一个输出插件的例子。
一个输出插件是通过动态载入一个以输出插件名称作为基础名称的共享库来载入的。
将使用普通的库搜索路径来定位该库。为了提供所要求的输出插件回调并且指示该
库确实是一个输出插件,需要提供一个名为
_PG_output_plugin_init
的函数。这个函数会被传入一个
结构,其中被填充了各个动作的回调函数指针。
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb);
回调函数begin_cb
、change_cb
以及commit_cb
是必需的,而
startup_cb
、filter_by_origin_cb
和shutdown_cb
是可选的。
要解码、格式化并且输出更改,输出插件可以使用大部分后端的标准功能,包括调用 输出函数。只要访问的关系是initdb在 pg_catalog模式中创建的或者被使用
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
标记为用户提供的系统表,就允许对关系的只读访问。任何导致事务 ID 分配的动作 都被禁止。其中包括写表、执行 DDL 更改以及调用txid_current()。
输出插件回调可以以近乎任意格式向消费者传递数据。对于某些用例,例如通过 SQL 查看更改,以可能包含任何数据的数据类型(例如bytea)返回数据 可能会很麻烦。如果输出插件只输出服务器编码的文本数据,它可以在 启动回调中通过设 置为OUTPUT_PLUGIN_TEXTUAL_OUTPUT替代 OUTPUT_PLUGIN_BINARY_OUTPUT来声明这一点。在这种情况下, 所有的数据必须是属于服务器的编码,这样一个text数据就能包含它。在 启用了断言的编译中会检查这一点。
一个输出插件需要提供一些回调,它通过它们得到有关更改发生的通知。
并发事务以提交顺序被解码,并且只有属于特定事务的更改会在 begin和commit回调之间被解码。被显式 或隐式回滚的事务不会被解码。成功的检查点被折叠到包含它们的事务中,并且 保持它们在该事务中被执行的顺序。
注意: 只有已经被安全地刷入磁盘的事务将会被解码。当 synchronous_commit被设置为off 时,这会导致一个COMMIT在随后的 pg_logical_slot_get_changes()中不会立即被解码。
只要一个复制槽被创建或者被要求流式传送更改,可选的
startup_cb
回调就会被调用,不管有多少更改准备输出。
typedef void (*LogicalDecodeStartupCB) ( struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init );
当复制槽被创建时,is_init参数将为真,否则为假。 options指向一个输出插件可以设置的选项 的结构:
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; } OutputPluginOptions;
output_type必须被设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 或者OUTPUT_PLUGIN_BINARY_OUTPUT。另见 第 46.6.3 节.
启动回调应该验证出现在 ctx->output_plugin_options中的选项。如果输出插件 需要有一个状态,它可以使用 ctx->output_plugin_private来存储之。
只要一个之前活跃的复制槽不再使用,就会调用可选的
shutdown_cb
回调,它可以被用来释放输出插件
私有的资源。该槽并不一定需要被删除,只要其中的流被停止即可。
typedef void (*LogicalDecodeShutdownCB) ( struct LogicalDecodingContext *ctx );
只要一个已提交事务的开始动作被解码,就会调用必须提供的
begin_cb
回调。被中止的事务及其内容不会被解码。
typedef void (*LogicalDecodeBeginCB) ( struct LogicalDecodingContext *, ReorderBufferTXN *txn );
txn参数包含有关该事务的元信息,例如该 事务被提交的时间戳以及该事务的 XID。
只要一个已提交事务的提交动作被解码,就会调用必须提供的
commit_cb
回调。在此之前,如果有任何被修改
的行,将为所有被修改的行调用change_cb
回调。
typedef void (*LogicalDecodeCommitCB) ( struct LogicalDecodingContext *, ReorderBufferTXN *txn );
对于一个事务中的每一个行修改,都将调用必须提供的
change_cb
回调,这种修改可能是一个
INSERT、UPDATE或者
DELETE。即使原始命令一次修改了多行,该回调也会
为其中的每一行调用一次。
typedef void (*LogicalDecodeChangeCB) ( struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change );
ctx和txn参数与
begin_cb
和commit_cb
回调具有相同的内容,但是额外多出一个关系描述符
relation指向该行所属的关系以及一个结构
change描述被传入的行修改。
注意: 只有没有被标记为“不做日志”(见 UNLOGGED)并且非临时(见 TEMPORARY or TEMP)的用户定义表中的 更改才能用逻辑解码抽取。
调用可选的filter_by_origin_cb
回调来确定输出插件是否对从origin_id
重放的数据感兴趣。
typedef bool (*LogicalDecodeFilterByOriginCB) ( struct LogicalDecodingContext *ctx, RepNodeId origin_id );
ctx参数具有与其他回调相同的内容。没有信息, 但来源是可用的。要表明源自传入的节点的更改是不相关的,返回true, 导致它们被过滤掉;否则为假。对于已被过滤掉的事务和更改, 不会调用其他回调。
这在实现级联或多向复制解决方案时很有用。 通过原点过滤允许防止在这样的设置中来回复制相同的改变。 虽然事务和更改还包含有关原点的信息,通过此回调进行过滤显然更有效。
在begin_cb
、commit_cb
或者
change_cb
回调中,为了实际产生输出,
输出插件可以把数据写入到ctx->out中的
StringInfo输出缓冲区中。在写出到输出缓冲区之前,必须先
调用OutputPluginPrepareWrite(ctx, last_write)
,在完
成写入到缓冲区后,必须调用
OutputPluginWrite(ctx, last_write)
来执行写出。
last_write指出一次特定的写出是否为该回调的最后
一次写出。
下面的例子展示了如何把数据输出给一个输出插件的消费者:
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);