skytools的pgq把PostgreSQL当消息队列来使用,londiste3再此基础上实现了表级逻辑复制,同时skytools还开放了API,允许用户自定义消息订阅功能。
本文将介绍一下PGQ的触发器实现,因为有些情况下,你可能不方便安装PGQ。对于这种场景,其实我们可以使用自定义函数来实现同样的功能。
本文的最终效果和使用PostgreSQL logical decode非常类似,只是对性能影响更大,因为用到了触发器,对于不支持logical decode的版本,使用本文提供的复制方法是不错的选择。
触发器记录了所有需要用于回放的信息,包括事务号,schema,table,OP,OLD REC,NEW REC,事务提交时间。回放时,严格按照事务提交顺序进行回放。
DDL需要其他方式来实现,比如事件触发器。
设计思路,还有很多可以改进的点,本文只是一个演示。
- 使用HSTORE存储所有跟踪对象的记录。
- 记录事务号,如果要按事务来回放,这个是比较有效的。
如果不按事务来回放,就是最终一致性。
但是不按事务来同步的话,有一个问题,延迟检测的主外键关系,SQL在目标端操作可能导致失败。 - 在数据中加入消费标记,还有更好的方法(例如对记录订阅者的已读状态),使用标记这种方法只能有一个订阅者。
- 使用quote_nullable给文本逃逸
- 使用quote_ident给关键字逃逸
- 使用7张表来记录消息,一周中每天一张表。(如果量大,这个可以改进,比如每个小时一张表)
(同样可以效仿PGQ,使用另一个进程ticker来切分消息,将消息分组) - 事件触发器,记录DDL语句,并在队列表中记录状态,遇到DDL时,停止往下取消息,修改DDL标记位后,允许继续取消息。
- 打开事务提交时间戳track_commit_timestamp,按事务提交顺序取出,rebuild SQL.
(理论上这样的回放顺序可以确保一致性)
(track_commit_timestamp是9.5才有的特性,本文暂不支持。BDR也需要依赖事务提交时间戳)
或者使用clock_timestamp,完全按照SQL执行顺序来回访。本文使用这种方式。 - 跨天事务的问题(如果事务跨天,可能导致先执行后半截,再执行前半截,有问题,调用mq.build_sql时,用repeatable read可以解决这个问题。)
创建测试表
CREATE TABLE test (id int primary key, info text, crt_time timestamp(0));
创建hstore extension
create extension hstore;
创建队列schema
CREATE SCHEMA IF NOT EXISTS mq;
创建7个消息队列记录表
CREATE TABLE mq.table_change_rec (
id serial8 primary key,
x_id int8 default txid_current(), -- 事务号
consumed boolean not null default false, -- 是否已消费
relid oid, -- pg_class.oid
table_schema name, -- schema name
table_name name, -- table name
when_tg text, -- after or before
level text, -- statement or row
op text, -- delete, update, or insert or truncate
old_rec hstore,
new_rec hstore,
crt_time timestamp without time zone not null, -- 时间
dbname name, -- 数据库名
username name, -- 用户名
client_addr inet, -- 客户端地址
client_port int -- 客户端端口
);
在时间上创建索引
create index x_id_table_change_rec on mq.table_change_rec(x_id) where consumed=false;
create index crt_time_id_table_change_rec on mq.table_change_rec(crt_time,id) where consumed=false;
创建子表
create table mq.table_change_rec0 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec1 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec2 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec3 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec4 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec5 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec6 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
创建取事务结束时间的函数,结合事务本地变量来实现。
create or replace function mq.get_commit_time() returns timestamp without time zone as $$
declare
res timestamp without time zone;
begin
show commit_time.realval into res;
return res;
exception when others then -- 如果未设置, 则使用以下SQL设置.
res := clock_timestamp();
execute 'set local commit_time.realval = '''||res||''''; -- 设置事务级变量
return res;
end;
$$ language plpgsql;
创建触发器函数,记录队列
CREATE OR REPLACE FUNCTION mq.dml_trace()
RETURNS trigger
LANGUAGE plpgsql
AS $BODY$
DECLARE
v_new_rec hstore;
v_old_rec hstore;
v_username name := session_user;
v_dbname name := current_database();
v_client_addr inet := inet_client_addr();
v_client_port int := inet_client_port();
v_crt_time timestamp without time zone := mq.get_commit_time();
v_xid int8 := txid_current();
v_dofweek int := EXTRACT(DOW FROM v_crt_time);
BEGIN
case TG_OP
when 'DELETE' then
v_old_rec := hstore(OLD.*);
case v_dofweek
when 0 then
insert into mq.table_change_rec0 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 1 then
insert into mq.table_change_rec1 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 2 then
insert into mq.table_change_rec2 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 3 then
insert into mq.table_change_rec3 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 4 then
insert into mq.table_change_rec4 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 5 then
insert into mq.table_change_rec5 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 6 then
insert into mq.table_change_rec6 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
end case;
when 'INSERT' then
v_new_rec := hstore(NEW.*);
case v_dofweek
when 0 then
insert into mq.table_change_rec0 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 1 then
insert into mq.table_change_rec1 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 2 then
insert into mq.table_change_rec2 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 3 then
insert into mq.table_change_rec3 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 4 then
insert into mq.table_change_rec4 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 5 then
insert into mq.table_change_rec5 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 6 then
insert into mq.table_change_rec6 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
end case;
when 'UPDATE' then
v_old_rec := hstore(OLD.*);
v_new_rec := hstore(NEW.*);
case v_dofweek
when 0 then
insert into mq.table_change_rec0 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 1 then
insert into mq.table_change_rec1 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 2 then
insert into mq.table_change_rec2 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 3 then
insert into mq.table_change_rec3 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 4 then
insert into mq.table_change_rec4 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 5 then
insert into mq.table_change_rec5 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
when 6 then
insert into mq.table_change_rec6 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port);
end case;
else
return null;
end case;
RETURN null;
END;
$BODY$ strict;
创建触发器,跟踪DML,使用deferred,在事务结束时触发,从而获得事务的结束时间。
所以事务不要使用immediate,否则时间会变成第一条触发的时间。
一个事务有多行触发时,都记录同一个时间,即事务的结束时间。
回放时,也使用事务结束时间顺序回放。
CREATE CONSTRAINT TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON test DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace();
DDL跟踪使用事件触发器,本文未涉及,下一个版本完善。
测试
insert into test values (1,'test',now());
insert into test values (2,'你好\a\\''',now());
update test set info='new' where id=1;
delete from test where id=2;
查看跟踪信息
postgres=# select tableoid::regclass,* from mq.table_change_rec;
-[ RECORD 1 ]+---------------------------------------------------------------------
tableoid | table_change_rec2
x_id | 283001935
consumed | f
relid | 24960
table_schema | public
table_name | test
when_tg | AFTER
level | ROW
op | INSERT
old_rec |
new_rec | "id"=>"1", "info"=>"test", "crt_time"=>"2016-01-05 10:29:10"
crt_time | 2016-01-05 10:29:09.755149
dbname | postgres
username | postgres
client_addr |
client_port |
-[ RECORD 2 ]+---------------------------------------------------------------------
tableoid | table_change_rec2
x_id | 283001936
consumed | f
relid | 24960
table_schema | public
table_name | test
when_tg | AFTER
level | ROW
op | INSERT
old_rec |
new_rec | "id"=>"2", "info"=>"你好\\a\\\\'", "crt_time"=>"2016-01-05 10:29:10"
crt_time | 2016-01-05 10:29:09.762116
dbname | postgres
username | postgres
client_addr |
client_port |
-[ RECORD 3 ]+---------------------------------------------------------------------
tableoid | table_change_rec2
x_id | 283001937
consumed | f
relid | 24960
table_schema | public
table_name | test
when_tg | AFTER
level | ROW
op | UPDATE
old_rec | "id"=>"1", "info"=>"test", "crt_time"=>"2016-01-05 10:29:10"
new_rec | "id"=>"1", "info"=>"new", "crt_time"=>"2016-01-05 10:29:10"
crt_time | 2016-01-05 10:29:09.776981
dbname | postgres
username | postgres
client_addr |
client_port |
-[ RECORD 4 ]+---------------------------------------------------------------------
tableoid | table_change_rec2
x_id | 283001938
consumed | f
relid | 24960
table_schema | public
table_name | test
when_tg | AFTER
level | ROW
op | DELETE
old_rec | "id"=>"2", "info"=>"你好\\a\\\\'", "crt_time"=>"2016-01-05 10:29:10"
new_rec |
crt_time | 2016-01-05 10:29:10.06243
dbname | postgres
username | postgres
client_addr |
client_port |
消费者如何订阅消息
分解步骤,你可以写成函数来实现。
获得表名
根据触发类型封装SQL
1. insert
取new_rec长度,
array_length(hstore_to_matrix(new_rec),1) = 3
列名,
quote_ident(hstore_to_matrix(new_rec))[1][1]) = 'id'
quote_ident(hstore_to_matrix(new_rec))[2][1]) = 'info'
quote_ident(hstore_to_matrix(new_rec))[3][1]) = 'crt_time'
值
quote_nullable(hstore_to_matrix(new_rec))[1][2]) = '1'
quote_nullable(hstore_to_matrix(new_rec))[2][2]) = 'test'
quote_nullalbe(hstore_to_matrix(new_rec))[3][2]) = '2016-01-04 19:42:36'
......
封装SQL
insert into 表名(列名) values (值);
2. update
取new_rec和old_rec长度,列名,值
3. delete
取old_rec长度,列名,值
写成消费函数如下:
(批量取N行,并保证事务完整性。)
create or replace function mq.build_sql(n int) returns setof text as $$
declare
m int := 0;
v_table_change_rec mq.table_change_rec;
v_tablename name;
v_crt_time timestamp without time zone;
curs1 refcursor;
v_sql text := '';
v_cols text := '';
v_vals text := '';
v_upd_set text := '';
v_upd_del_where text :='';
v_x_id int8 ;
v_max_crt_time timestamp without time zone;
begin
if n <=0 then
-- raise notice 'n must be > 0.';
return;
end if;
return next 'BEGIN;';
-- 取一个最小的队列表
select tablename,crt_time into v_tablename,v_crt_time from
(
select 'table_change_rec0' as tablename,min(crt_time) as crt_time from mq.table_change_rec0 where consumed=false
union all
select 'table_change_rec1' as tablename,min(crt_time) as crt_time from mq.table_change_rec1 where consumed=false
union all
select 'table_change_rec2' as tablename,min(crt_time) as crt_time from mq.table_change_rec2 where consumed=false
union all
select 'table_change_rec3' as tablename,min(crt_time) as crt_time from mq.table_change_rec3 where consumed=false
union all
select 'table_change_rec4' as tablename,min(crt_time) as crt_time from mq.table_change_rec4 where consumed=false
union all
select 'table_change_rec5' as tablename,min(crt_time) as crt_time from mq.table_change_rec5 where consumed=false
union all
select 'table_change_rec6' as tablename,min(crt_time) as crt_time from mq.table_change_rec6 where consumed=false
) t
order by crt_time limit 1;
case v_tablename
when 'table_change_rec0' then
-- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. )
-- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务.
-- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序.
-- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行.
if n=1 then
select x_id into v_x_id from mq.table_change_rec0 where consumed=false order by crt_time,id limit 1;
open curs1 for select * from mq.table_change_rec0 where consumed=false and x_id=v_x_id order by crt_time,id for update;
else
select crt_time into v_crt_time from mq.table_change_rec0 where consumed=false order by crt_time,id limit 1 offset n-1;
if found then
open curs1 for select * from mq.table_change_rec0 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update;
else
-- n超出所剩跟踪记录
open curs1 for select * from mq.table_change_rec0 where consumed=false order by crt_time,id for update;
end if;
end if;
fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;
-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec0 set consumed=true where current of curs1;
return next v_sql;
when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec0 set consumed=true where current of curs1;
return next v_sql;
when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec0 set consumed=true where current of curs1;
return next v_sql;
else
-- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;
else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;
when 'table_change_rec1' then
-- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. )
-- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务.
-- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序.
-- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行.
if n=1 then
select x_id into v_x_id from mq.table_change_rec1 where consumed=false order by crt_time,id limit 1;
open curs1 for select * from mq.table_change_rec1 where consumed=false and x_id=v_x_id order by crt_time,id for update;
else
select crt_time into v_crt_time from mq.table_change_rec1 where consumed=false order by crt_time,id limit 1 offset n-1;
if found then
open curs1 for select * from mq.table_change_rec1 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update;
else
-- n超出所剩跟踪记录
open curs1 for select * from mq.table_change_rec1 where consumed=false order by crt_time,id for update;
end if;
end if;
fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;
-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec1 set consumed=true where current of curs1;
return next v_sql;
when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec1 set consumed=true where current of curs1;
return next v_sql;
when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec1 set consumed=true where current of curs1;
return next v_sql;
else
-- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;
else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;
when 'table_change_rec2' then
-- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. )
-- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务.
-- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序.
-- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行.
if n=1 then
select x_id into v_x_id from mq.table_change_rec2 where consumed=false order by crt_time,id limit 1;
open curs1 for select * from mq.table_change_rec2 where consumed=false and x_id=v_x_id order by crt_time,id for update;
else
select crt_time into v_crt_time from mq.table_change_rec2 where consumed=false order by crt_time,id limit 1 offset n-1;
if found then
open curs1 for select * from mq.table_change_rec2 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update;
else
-- n超出所剩跟踪记录
open curs1 for select * from mq.table_change_rec2 where consumed=false order by crt_time,id for update;
end if;
end if;
fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;
-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec2 set consumed=true where current of curs1;
return next v_sql;
when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec2 set consumed=true where current of curs1;
return next v_sql;
when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec2 set consumed=true where current of curs1;
return next v_sql;
else
-- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;
else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;
when 'table_change_rec3' then
-- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. )
-- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务.
-- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序.
-- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行.
if n=1 then
select x_id into v_x_id from mq.table_change_rec3 where consumed=false order by crt_time,id limit 1;
open curs1 for select * from mq.table_change_rec3 where consumed=false and x_id=v_x_id order by crt_time,id for update;
else
select crt_time into v_crt_time from mq.table_change_rec3 where consumed=false order by crt_time,id limit 1 offset n-1;
if found then
open curs1 for select * from mq.table_change_rec3 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update;
else
-- n超出所剩跟踪记录
open curs1 for select * from mq.table_change_rec3 where consumed=false order by crt_time,id for update;
end if;
end if;
fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;
-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec3 set consumed=true where current of curs1;
return next v_sql;
when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec3 set consumed=true where current of curs1;
return next v_sql;
when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec3 set consumed=true where current of curs1;
return next v_sql;
else
-- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;
else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;
when 'table_change_rec4' then
-- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. )
-- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务.
-- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序.
-- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行.
if n=1 then
select x_id into v_x_id from mq.table_change_rec4 where consumed=false order by crt_time,id limit 1;
open curs1 for select * from mq.table_change_rec4 where consumed=false and x_id=v_x_id order by crt_time,id for update;
else
select crt_time into v_crt_time from mq.table_change_rec4 where consumed=false order by crt_time,id limit 1 offset n-1;
if found then
open curs1 for select * from mq.table_change_rec4 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update;
else
-- n超出所剩跟踪记录
open curs1 for select * from mq.table_change_rec4 where consumed=false order by crt_time,id for update;
end if;
end if;
fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;
-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec4 set consumed=true where current of curs1;
return next v_sql;
when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec4 set consumed=true where current of curs1;
return next v_sql;
when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec4 set consumed=true where current of curs1;
return next v_sql;
else
-- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;
else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;
when 'table_change_rec5' then
-- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. )
-- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务.
-- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序.
-- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行.
if n=1 then
select x_id into v_x_id from mq.table_change_rec5 where consumed=false order by crt_time,id limit 1;
open curs1 for select * from mq.table_change_rec5 where consumed=false and x_id=v_x_id order by crt_time,id for update;
else
select crt_time into v_crt_time from mq.table_change_rec5 where consumed=false order by crt_time,id limit 1 offset n-1;
if found then
open curs1 for select * from mq.table_change_rec5 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update;
else
-- n超出所剩跟踪记录
open curs1 for select * from mq.table_change_rec5 where consumed=false order by crt_time,id for update;
end if;
end if;
fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;
-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec5 set consumed=true where current of curs1;
return next v_sql;
when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec5 set consumed=true where current of curs1;
return next v_sql;
when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec5 set consumed=true where current of curs1;
return next v_sql;
else
-- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;
else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;
when 'table_change_rec6' then
-- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. )
-- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务.
-- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序.
-- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行.
if n=1 then
select x_id into v_x_id from mq.table_change_rec6 where consumed=false order by crt_time,id limit 1;
open curs1 for select * from mq.table_change_rec6 where consumed=false and x_id=v_x_id order by crt_time,id for update;
else
select crt_time into v_crt_time from mq.table_change_rec6 where consumed=false order by crt_time,id limit 1 offset n-1;
if found then
open curs1 for select * from mq.table_change_rec6 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update;
else
-- n超出所剩跟踪记录
open curs1 for select * from mq.table_change_rec6 where consumed=false order by crt_time,id for update;
end if;
end if;
fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;
-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec6 set consumed=true where current of curs1;
return next v_sql;
when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec6 set consumed=true where current of curs1;
return next v_sql;
when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
else
v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
end if;
end loop;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;
-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec6 set consumed=true where current of curs1;
return next v_sql;
else
-- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;
else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;
else
-- raise notice 'no % queue table deal code in this function.', v_tablename;
return;
end case;
end;
$$ language plpgsql strict ;
使用这个消费函数,进行数据测试:
postgres=# create database src;
CREATE DATABASE
postgres=# create database dest;
CREATE DATABASE
源端创建hstore,队列表,触发器函数,
(略)
创建测试表,以及触发器
src=# create table test(id int primary key,info text,crt_time timestamp);
CREATE TABLE
src=# CREATE CONSTRAINT TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON test DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace();
CREATE TRIGGER
目标端创建测试表
dest=# create table test(id int primary key,info text,crt_time timestamp);
CREATE TABLE
创建压测脚本
vi test.sql
\setrandom id 1 5000000
insert into test values (:id, md5(random()::text), now()) on conflict ON CONSTRAINT test_pkey do update set info=excluded.info,crt_time=excluded.crt_time;
压测
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 48 -j 48 -T 120 src
同时另外开一个会话数据同步到dest库,每次同步1万条循环
while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql(10000)) to stdout;commit;' | psql dest -f - >/dev/null ; done
同步结束之后,查看两边的HASH值是否一致。
dest=# \c src
src=# select count(*),now() from test;
count | now
---------+-------------------------------
3910319 | 2016-01-05 15:26:24.046004+08
(1 row)
src=# select sum(hashtext(test.*::text)) from test;
sum
----------------
-1327225009705
(1 row)
src=# \c dest
dest=# select count(*),now() from test;
count | now
---------+-------------------------------
3910319 | 2016-01-05 15:27:27.210017+08
(1 row)
dest=# select sum(hashtext(test.*::text)) from test;
sum
----------------
-1327225009705
(1 row)
已上线业务(已经有数据积累了的)的数据同步的例子:
1. 源端
首先配置以上队列跟踪。(同本文上面的过程)
然后,在一个rr隔离级别的事务中,导出数据。
# begin work isolation level REPEATABLE READ;
BEGIN
-- 记录当前事务快照,最大已提交事务号,最小未分配事务号,未提交事务数组。
# select * from txid_current_snapshot();
txid_current_snapshot
-----------------------
339148965:339148965:
-- 将数据拷贝到文件
# copy table to '';
# 将mq记录表,不需要复制的记录标记为consumed=true,不需要的记录包括
schema+tablename匹配被复制表 并且:
事务 <= 最大已提交事务号
事务 in ( >最大已提交事务 and <最小未分配事务 and not in (为提交事务数组) )
2. 目标端
导入数据
# copy table from '';
然后就可以使用以上消费函数进行数据复制了。