金融风控、公安刑侦、社会关系、人脉分析等需求分析与数据库实现 - PostgreSQL图数据库场景应用
作者: digoal
日期: 2016-12-13
标签: PostgreSQL, pgrouting, neo4j, graph database, 图数据库, 金融风险控制, 风控, 刑侦, 社会关系, 近亲, linkedin, 人脉
1. 猎头挖人
2. 公安破案
3. 金融风控
比如银行在审核贷款资格时,通常需要审核申请人是否有偿还能力,是否有虚假消息,行为习惯,资产,朋友圈等等。 同样涉及到复杂的人物关系,人的行为关系分析等等。
详见: https://en.wikipedia.org/wiki/Graph_database
PostgreSQL是一个功能全面的数据库,其中就有一些图数据库产品的后台是使用PostgreSQL的,例如OpenCog, Cayley等。
PostgreSQL 递归查询能满足关系推导的需求吗
PostgreSQL 递归查询是很出彩的功能,还记不记得我写的, 《distinct xx和count(distinct xx)的变态递归优化方法 - 索引收敛(skip scan)扫描》 ,《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》
postgres=# with recursive s as ( select c1,c2 from a where c1=1 union all select a.c1,a.c2 from a join s on (a.c1=s.c2) where a.c1 not in ( with t as (insert into tmp select * from s) select c2 from tmp ) and s.* is not null ) select * from s; ERROR: 42P19: recursive reference to query "s" must not appear within a subquery LINE 4: ... not in (with t as (insert into tmp select * from s) select ... ^ LOCATION: checkWellFormedRecursionWalker, parse_cte.c:773
递归只能排除work table的集合,不能排除整个递归过程的集合,如果关系中存在LOOP则会导致无限LOOP。
图例 :
PostgreSQL UDF解决关系环的问题
postgres=# create table a(c1 int, c2 int, primary key(c1,c2)); CREATE TABLE postgres=# create index idx_a_1 on a(c1); CREATE INDEX postgres=# create index idx_a_2 on a(c2); CREATE INDEX postgres=# insert into a select random()*10000000, random()*10000000 from generate_series(1,100000000) ;
create temp table if not exists tmp1(level int, c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp1_1 on tmp1(level); create index if not exists idx_tmp1_2 on tmp1(c1); create index if not exists idx_tmp1_3 on tmp1(c2); create unlogged table u1 (like tmp1); create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp2_1 on tmp2(level); create index if not exists idx_tmp2_2 on tmp2(c1); create index if not exists idx_tmp2_3 on tmp2(c2); create unlogged table u2 (like tmp2);
函数1, 输出以A为中心的,N级关系数据。
create or replace function find_rel(v_c1 int, v_level int) returns setof u1 as $$ declare i int := 1; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 create temp table if not exists tmp1(level int, c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp1_1 on tmp1(level); create index if not exists idx_tmp1_2 on tmp1(c1); create index if not exists idx_tmp1_3 on tmp1(c2); -- 存储初始层级, 即起点的数据 return query insert into tmp1 select i, * from a where c1=v_c1 returning *; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; -- 通过与level=i-1的JOIN推导下一级关系(group by排除重复node(比如1-2-3-4, 1-5-3-4,这里的3会被排除掉)), -- 通过not exists排除打环的点 return query insert into tmp1 select i, a.c1, a.c2 from a join (select c2 from tmp1 where level=i-1 group by c2) tmp on (a.c1=tmp.c2) where not exists (select 1 from tmp1 where a.c1 = tmp1.c1) returning *; end loop; end; $$ language plpgsql strict;
函数2, 输出以A为中心的,N级关系数据。
create or replace function find_rel_withpath(v_c1 int, v_level int) returns setof u2 as $$ declare i int := 1; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp2_1 on tmp2(level); create index if not exists idx_tmp2_2 on tmp2(c1); create index if not exists idx_tmp2_3 on tmp2(c2); -- 存储初始层级, 即起点的数据 return query insert into tmp2 select i, array[]::int[] || c1 || c2 , * from a where c1=v_c1 returning *; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; -- 通过与level=i-1的JOIN推导下一级关系(group by排除重复node(比如1-2-3-4, 1-2-3-4,完全重复的路径,排除掉。 -- 实际中加了c1,c2唯一约束后不可能存在)),通过not exists排除打环的点 -- path用来表示当前路径,是不是很爽 -- 如果有c1,c2的PK约束,可以不使用group by,直接与tmp2关联 return query insert into tmp2 select i, tmp.path||a.c2, a.c1, a.c2 from a join (select c2,path from tmp2 where level=i-1 group by c2,path) tmp on (a.c1=tmp.c2) where not exists (select 1 from tmp2 where a.c1 = tmp2.c1) returning *; end loop; end; $$ language plpgsql strict;
create or replace function find_rel_path(v_c1 int, v_c2 int) returns setof int[] as $$ declare i int := 1; begin -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp2_1 on tmp2(level); create index if not exists idx_tmp2_2 on tmp2(c1); create index if not exists idx_tmp2_3 on tmp2(c2); -- 存储初始层级, 即起点的数据 insert into tmp2 select i, array[]::int[] || c1 || c2 , * from a where c1=v_c1; loop i := i+1; perform 1 from tmp2 where c2=v_c2 limit 1; if found then return query select path from tmp2 where c2=v_c2 and level=i-1; return; end if; insert into tmp2 select i, tmp.path||a.c2, a.c1, a.c2 from a join (select c2,path from tmp2 where level=i-1 group by c2,path) tmp on (a.c1=tmp.c2) where not exists (select 1 from tmp2 where a.c1 = tmp2.c1); end loop; end; $$ language plpgsql strict;
1. 找出ID=1这个用户的3级关系网
select * from find_rel(1,3); select * from find_rel_withpath(1,3);
postgres=# select * from find_rel(1,3); NOTICE: relation "tmp1" already exists, skipping NOTICE: relation "idx_tmp1_1" already exists, skipping NOTICE: relation "idx_tmp1_2" already exists, skipping NOTICE: relation "idx_tmp1_3" already exists, skipping level | c1 | c2 -------+---------+--------- 1 | 1 | 5157873 1 | 1 | 3102468 1 | 1 | 8399312 1 | 1 | 1442141 1 | 1 | 5094441 1 | 1 | 1521897 1 | 1 | 401079 2 | 401079 | 1147078 2 | 401079 | 9740100 ...... 3 | 1731998 | 6503196 3 | 1731998 | 5112396 3 | 6525458 | 937613 3 | 6525458 | 8459123 3 | 6525458 | 8419231 3 | 6525458 | 4021987 (828 rows) Time: 15.000 ms postgres=# select * from find_rel_withpath(1,3); NOTICE: relation "tmp2" already exists, skipping NOTICE: relation "idx_tmp2_1" already exists, skipping NOTICE: relation "idx_tmp2_2" already exists, skipping NOTICE: relation "idx_tmp2_3" already exists, skipping level | path | c1 | c2 -------+-----------------------------+---------+--------- 1 | {1,5157873} | 1 | 5157873 1 | {1,3102468} | 1 | 3102468 1 | {1,8399312} | 1 | 8399312 1 | {1,1442141} | 1 | 1442141 1 | {1,5094441} | 1 | 5094441 1 | {1,1521897} | 1 | 1521897 1 | {1,401079} | 1 | 401079 2 | {1,401079,1147078} | 401079 | 1147078 2 | {1,401079,9740100} | 401079 | 9740100 ...... 3 | {1,1442141,9719411,7811631} | 9719411 | 7811631 3 | {1,401079,9740100,5119416} | 9740100 | 5119416 3 | {1,401079,9740100,350046} | 9740100 | 350046 3 | {1,401079,9740100,8223067} | 9740100 | 8223067 3 | {1,401079,9740100,5608312} | 9740100 | 5608312 3 | {1,401079,9740100,7920319} | 9740100 | 7920319 3 | {1,401079,9740100,6416565} | 9740100 | 6416565 (828 rows) Time: 19.524 ms postgres=# select * from find_rel_withpath(1,5) order by level desc limit 10; NOTICE: relation "tmp2" already exists, skipping NOTICE: relation "idx_tmp2_1" already exists, skipping NOTICE: relation "idx_tmp2_2" already exists, skipping NOTICE: relation "idx_tmp2_3" already exists, skipping level | path | c1 | c2 -------+----------------------------------------+-----+--------- 5 | {1,401079,3806993,3879310,165,8074824} | 165 | 8074824 5 | {1,401079,3806993,3879310,165,5983603} | 165 | 5983603 5 | {1,401079,3806993,3879310,165,9804825} | 165 | 9804825 5 | {1,401079,3806993,3879310,165,3848025} | 165 | 3848025 5 | {1,401079,3806993,3879310,165,2045526} | 165 | 2045526 5 | {1,401079,3806993,3879310,165,9783524} | 165 | 9783524 5 | {1,401079,3806993,3879310,165,386886} | 165 | 386886 5 | {1,401079,3806993,3879310,165,6318408} | 165 | 6318408 5 | {1,401079,3806993,3879310,165,1150578} | 165 | 1150578 5 | {1,401079,3806993,3879310,165,6702827} | 165 | 6702827 (10 rows) Time: 1473.953 ms
2. 找出ID=1和ID=386886的最短关系路径
select * from find_rel_path(1,386886);
postgres=# select * from find_rel_path(1,386886); NOTICE: relation "tmp2" already exists, skipping NOTICE: relation "idx_tmp2_1" already exists, skipping NOTICE: relation "idx_tmp2_2" already exists, skipping NOTICE: relation "idx_tmp2_3" already exists, skipping find_rel_path --------------------------------------- {1,401079,3806993,3879310,165,386886} (1 row) Time: 1069.781 ms postgres=# select * from find_rel_path(1,3879310); NOTICE: relation "tmp2" already exists, skipping NOTICE: relation "idx_tmp2_1" already exists, skipping NOTICE: relation "idx_tmp2_2" already exists, skipping NOTICE: relation "idx_tmp2_3" already exists, skipping find_rel_path ---------------------------- {1,401079,3806993,3879310} (1 row) Time: 17.290 ms
UDF优化思路1 - 游标,流式接收
游标用法参考: https://www.postgresql.org/docs/9.6/static/plpgsql-cursors.html
create or replace function find_rel_withpath_cur(v_c1 int, v_level int) returns setof refcursor as $$ declare i int := 1; ref refcursor[]; res refcursor; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; for x in 1..v_level loop ref[x] := 'a'||x; end loop; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp2_1 on tmp2(level); create index if not exists idx_tmp2_2 on tmp2(c1); create index if not exists idx_tmp2_3 on tmp2(c2); -- 存储初始层级, 即起点的数据 res := ref[i]; open res for insert into tmp2 select i, array[]::int[] || c1 || c2 , * from a where c1=v_c1 returning *; return next res; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; -- 通过与level=i-1的JOIN推导下一级关系(group by排除重复node(比如1-2-3-4, 1-2-3-4,完全重复的路径,排除掉。 -- 实际中加了c1,c2唯一约束后不可能存在)),通过not exists排除打环的点 -- path用来表示当前路径,是不是很爽 -- 如果有c1,c2的PK约束,可以不使用group by,直接与tmp2关联 res := ref[i]; open res for insert into tmp2 select i, tmp.path||a.c2, a.c1, a.c2 from a join (select c2,path from tmp2 where level=i-1 group by c2,path) tmp on (a.c1=tmp.c2) where not exists (select 1 from tmp2 where a.c1 = tmp2.c1) returning *; return next res; end loop; end; $$ language plpgsql strict; postgres=# begin; BEGIN Time: 0.390 ms postgres=# select * from find_rel_withpath_cur(1,5); NOTICE: relation "tmp2" already exists, skipping NOTICE: relation "idx_tmp2_1" already exists, skipping NOTICE: relation "idx_tmp2_2" already exists, skipping NOTICE: relation "idx_tmp2_3" already exists, skipping find_rel_withpath_cur ----------------------- a1 a2 a3 a4 a5 (5 rows) Time: 4.008 ms
postgres=# fetch all in a1; level | path | c1 | c2 -------+-------------+----+--------- 1 | {1,5157873} | 1 | 5157873 1 | {1,3102468} | 1 | 3102468 1 | {1,8399312} | 1 | 8399312 1 | {1,1442141} | 1 | 1442141 1 | {1,5094441} | 1 | 5094441 1 | {1,1521897} | 1 | 1521897 1 | {1,401079} | 1 | 401079 (7 rows) Time: 0.958 ms
postgres=# fetch all in a2; level | path | c1 | c2 -------+---------------------+---------+--------- 2 | {1,401079,1147078} | 401079 | 1147078 2 | {1,401079,9740100} | 401079 | 9740100 2 | {1,401079,8171779} | 401079 | 8171779 2 | {1,401079,3806993} | 401079 | 3806993 2 | {1,401079,2491387} | 401079 | 2491387 ...... 2 | {1,8399312,5886963} | 8399312 | 5886963 2 | {1,8399312,3652462} | 8399312 | 3652462 2 | {1,8399312,8148713} | 8399312 | 8148713 2 | {1,8399312,8282991} | 8399312 | 8282991 (75 rows) Time: 3.173 ms
postgres=# fetch all in a3; level | path | c1 | c2 -------+-----------------------------+---------+--------- 3 | {1,1521897,47614,2653114} | 47614 | 2653114 3 | {1,1521897,47614,1354306} | 47614 | 1354306 3 | {1,1521897,47614,7452721} | 47614 | 7452721 ... 3 | {1,401079,9740100,7920319} | 9740100 | 7920319 3 | {1,401079,9740100,6416565} | 9740100 | 6416565 (746 rows) Time: 25.455 ms postgres=# fetch all in a4; ...... 4 | {1,401079,8171779,9968575,6388546} | 9968575 | 6388546 4 | {1,401079,8171779,9968575,8281935} | 9968575 | 8281935 4 | {1,401079,8171779,9968575,6076729} | 9968575 | 6076729 4 | {1,401079,8171779,9968575,7087557} | 9968575 | 7087557 (7383 rows) Time: 14.482 ms
postgres=# begin; BEGIN Time: 0.561 ms postgres=# select * from find_rel_withpath_cur(1,5); NOTICE: relation "tmp2" already exists, skipping NOTICE: relation "idx_tmp2_1" already exists, skipping NOTICE: relation "idx_tmp2_2" already exists, skipping NOTICE: relation "idx_tmp2_3" already exists, skipping find_rel_withpath_cur ----------------------- a1 a2 a3 a4 a5 (5 rows) Time: 2.161 ms
postgres=# fetch all in a4; level | path | c1 | c2 -------+------+----+---- (0 rows) Time: 0.738 ms postgres=# fetch all in a5; level | path | c1 | c2 -------+------+----+---- (0 rows) Time: 0.727 ms postgres=# fetch all in a1; level | path | c1 | c2 -------+-------------+----+--------- 1 | {1,5157873} | 1 | 5157873 1 | {1,3102468} | 1 | 3102468 1 | {1,8399312} | 1 | 8399312 1 | {1,1442141} | 1 | 1442141 1 | {1,5094441} | 1 | 5094441 1 | {1,1521897} | 1 | 1521897 1 | {1,401079} | 1 | 401079 (7 rows) Time: 0.954 ms
create or replace function find_rel_withpath_cur_array(v_c1 int, v_level int) returns setof refcursor as $$ declare i int := 1; ref refcursor[]; res refcursor; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; for x in 1..v_level loop ref[x] := 'a'||x; end loop; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp2_1 on tmp2(level); create index if not exists idx_tmp2_2 on tmp2(c1); create index if not exists idx_tmp2_3 on tmp2(c2); -- 存储初始层级, 即起点的数据 res := ref[i]; open res for insert into tmp2 select i as level, array[]::int[] || c1 || c2 as path , c1, c2 from (select c1,unnest(c2) as c2 from a where c1=v_c1) a returning *; return next res; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; -- 通过与level=i-1的JOIN推导下一级关系(group by排除重复node(比如1-2-3-4, 1-2-3-4,完全重复的路径,排除掉。 -- 实际中加了c1,c2唯一约束后不可能存在)),通过not exists排除打环的点 -- path用来表示当前路径,是不是很爽 -- 如果有c1,c2的PK约束,可以不使用group by,直接与tmp2关联 res := ref[i]; open res for insert into tmp2 select i as level, a.path||a.c2 as path, a.c1, a.c2 from (select tmp2.path, a.c1, unnest(a.c2) c2 from a join tmp2 on (a.c1=tmp2.c2 and tmp2.level=i-1 and tmp2.c1<>a.c1)) a returning *; return next res; end loop; end; $$ language plpgsql strict;
目前PostgreSQL的insert into ... returning *要等insert结束才能返回第一条记录,所以层级越深,插入时间越长,层级越深的数据接收可能越慢。那么还有更好的优化手段吗。
除了修改内核(这是最好的方法),让其支持insert into ... returning *的游标支持流式返回,还有一种方法,在函数中流式返回。
postgres=# create table a(c1 int, c2 int[], primary key (c1)); CREATE TABLE vi test.sql \set id random(1,100000000) insert into a select :id, (select array_agg(((width_bucket(:id,1,100000000,2000)-1)*50000 + (random()*50000)::int)) from generate_series(1,1000)) on conflict do nothing; pgbench -M prepared -n -r -P 5 -f ./test.sql -c 64 -j 64 -T 100000
create or replace function find_rel_withpath_cur(v_c1 int, v_level int) returns setof record as $$ declare i int := 1; ref1 cursor(var1 int, var2 int) for select var1 as level, array[]::int[] || c1 || c2 as path , c1, c2 from (select c1,unnest(c2) as c2 from a where c1=var2) a ; ref2 cursor(var1 int) for select var1 as level, a.path||a.c2 as path, a.c1, a.c2 from (select tmp2.path, a.c1, unnest(a.c2) c2 from a join tmp2 on (a.c1=tmp2.c2 and tmp2.level=i-1 and tmp2.c1<>a.c1)) a; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 -- 目前plpgsql不支持流式返回, 即使使用return next , return query -- https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html create temp table if not exists tmp2(level int, path int[] unique, c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp2_1 on tmp2(level, c2); -- 存储初始层级, 即起点的数据 for rec in ref1(i, v_c1) loop insert into tmp2 values (rec.level, rec.path, rec.c1, rec.c2) on conflict do nothing; if found then return next rec; end if; end loop; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; -- 通过与level=i-1的JOIN推导下一级关系(group by排除重复node(比如1-2-3-4, 1-2-3-4,完全重复的路径,排除掉。 -- 实际中加了c1,c2唯一约束后不可能存在)),通过not exists排除打环的点 -- path用来表示当前路径,是不是很爽 -- 如果有c1,c2的PK约束,可以不使用group by,直接与tmp2关联 for rec in ref2(i) loop insert into tmp2 values(rec.level, rec.path, rec.c1, rec.c2) on conflict do nothing; if found then return next rec; end if; end loop; end loop; return; end; $$ language plpgsql strict;
目前plpgsql不支持流式返回, 即使使用return next , return query, https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html
Note: The current implementation of RETURN NEXT and RETURN QUERY stores the entire result set before returning from the function, as discussed above. That means that if a PL/pgSQL function produces a very large result set, performance might be poor: data will be written to disk to avoid memory exhaustion, but the function itself will not return until the entire result set has been generated. A future version of PL/pgSQL might allow users to define set-returning functions that do not have this limitation. Currently, the point at which data begins being written to disk is controlled by the work_mem configuration variable. Administrators who have sufficient memory to store larger result sets in memory should consider increasing this parameter.
目前使用c function实现以上逻辑,可以实现函数的流式接收。
-- 消除重复NODE,例如有N条路径经过同一个NODE,只记录一个NODE。 create or replace function find_rel(v_c1 int, v_level int) returns setof u1 as $$ declare i int := 1; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 create temp table if not exists tmp1( level int, c1 int primary key) ON COMMIT delete rows; create index if not exists idx_tmp1_1 on tmp1(level, c1); -- 存储初始层级, 即起点的数据 return query insert into tmp1 select i, c1 from a where c1=v_c1 union select i, unnest(c2) from a where c1=v_c1 returning *; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; -- 通过与level=i-1的JOIN推导下一级关系(group by排除重复node(比如1-2-3-4, 1-5-3-4,这里的3会被排除掉)), -- 通过not exists排除打环的点 return query insert into tmp1 select t.i, t.c2 from ( select i, unnest(c2) c2 from a join tmp1 tmp on ( a.c1=tmp.c1 and tmp.level=i-1 and a.c1<>v_c1 ) ) t left join tmp1 on (t.c2=tmp1.c1) where tmp1.* is null group by 1,2 on conflict do nothing returning *; end loop; end; $$ language plpgsql strict; postgres=# select count(*) from find_rel(5000,10); NOTICE: relation "tmp1" already exists, skipping NOTICE: relation "idx_tmp1_1" already exists, skipping count ------- 50001 (1 row)
Time: 17131.492 ms
create or replace function find_rel_cur(v_c1 int, v_level int) returns setof refcursor as $$ declare i int := 1; ref refcursor[]; res refcursor; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; for x in 1..v_level loop ref[x] := 'a'||x; end loop; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 create temp table if not exists tmp1( level int, c1 int primary key) ON COMMIT delete rows; create index if not exists idx_tmp1_1 on tmp1(level, c1); -- 存储初始层级, 即起点的数据 res := ref[i]; open res for insert into tmp1 select i, c1 from a where c1=v_c1 union select i, unnest(c2) from a where c1=v_c1 returning *; return next res; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; -- 通过与level=i-1的JOIN推导下一级关系(group by排除重复node(比如1-2-3-4, 1-5-3-4,这里的3会被排除掉)), -- 通过not exists排除打环的点 res := ref[i]; open res for insert into tmp1 select t.i, t.c2 from ( select i, unnest(c2) c2 from a join tmp1 tmp on ( a.c1=tmp.c1 and tmp.level=i-1 and a.c1<>v_c1 ) ) t left join tmp1 on (t.c2=tmp1.c1) where tmp1.* is null group by 1,2 on conflict do nothing returning *; return next res; end loop; end; $$ language plpgsql strict; postgres=# select * from find_rel_cur(1,10); NOTICE: relation "tmp1" already exists, skipping NOTICE: relation "idx_tmp1_1" already exists, skipping find_rel_cur -------------- a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 (10 rows) Time: 2.098 ms postgres=# fetch 1 in a1; level | c1 -------+------- 1 | 37394 (1 row) Time: 3.138 ms postgres=# fetch 1 in a2; level | c1 -------+---- 2 | 0 (1 row) Time: 1345.473 ms
由于测试模型的问题,第三级搜索了5000万记录, 正常的业务模型不会如此,不必担心
postgres=# fetch 1 in a3; level | c1 -------+---- (0 rows) Time: 15587.686 ms postgres=# fetch 1 in a4; level | c1 -------+---- (0 rows) Time: 0.143 ms postgres=#
create extension dblink; CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator; CREATE SERVER dst FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '', port '1921', dbname 'postgres'); CREATE USER MAPPING FOR postgres SERVER dst OPTIONS (user 'postgres', password 'postgres'); create or replace function find_rel_withpath_notify(v_c1 int, v_level int, v_notify_channel text) returns void as $$ declare i int := 1; query text; ref1 cursor(var1 int, var2 int) for select var1 as level, array[]::int[] || c1 || c2 as path , c1, c2 from (select c1,unnest(c2) as c2 from a where c1=var2) a ; ref2 cursor(var1 int) for select var1 as level, a.path||a.c2 as path, a.c1, a.c2 from (select tmp2.path, a.c1, unnest(a.c2) c2 from a join tmp2 on (a.c1=tmp2.c2 and tmp2.level=i-1 and tmp2.c1<>a.c1)) a; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; -- 判断连接是否存在, 不存在则创建. if array_position(dblink_get_connections(), v_notify_channel) is not null then else perform dblink_connect(v_notify_channel, 'dst'); end if; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 -- 目前plpgsql不支持流式返回, 即使使用return next , return query -- https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html create temp table if not exists tmp2(level int, path int[] unique, c1 int, c2 int) ON COMMIT delete rows; create index if not exists idx_tmp2_1 on tmp2(level, c2); -- 存储初始层级, 即起点的数据 for rec in ref1(i, v_c1) loop insert into tmp2 values (rec.level, rec.path, rec.c1, rec.c2) on conflict do nothing; if found then query := format($_$select 1 from pg_notify( %L , 'level: %s, path: %s, c1: %s, c2: %s')$_$, v_notify_channel, rec.level, rec.path, rec.c1, rec.c2); -- 发送异步消息 perform * from dblink(v_notify_channel, query, true) as t(id int); end if; end loop; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; -- 通过与level=i-1的JOIN推导下一级关系(group by排除重复node(比如1-2-3-4, 1-2-3-4,完全重复的路径,排除掉。 -- 实际中加了c1,c2唯一约束后不可能存在)),通过not exists排除打环的点 -- path用来表示当前路径,是不是很爽 -- 如果有c1,c2的PK约束,可以不使用group by,直接与tmp2关联 for rec in ref2(i) loop insert into tmp2 values(rec.level, rec.path, rec.c1, rec.c2) on conflict do nothing; if found then query := format($_$select 1 from pg_notify( %L , 'level: %s, path: %s, c1: %s, c2: %s')$_$, v_notify_channel, rec.level, rec.path, rec.c1, rec.c2); -- 发送异步消息 perform * from dblink(v_notify_channel, query, true) as t(id int); end if; end loop; end loop; return; end; $$ language plpgsql strict;
postgres=# listen hello;
postgres=# select find_rel_withpath_notify(1,5,'hello');
。。。。。。 Asynchronous notification "hello" with payload "level: 2, path: {1,32847,15869}, c1: 32847, c2: 15869" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,21312}, c1: 32847, c2: 21312" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,22852}, c1: 32847, c2: 22852" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,8031}, c1: 32847, c2: 8031" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,45248}, c1: 32847, c2: 45248" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,7139}, c1: 32847, c2: 7139" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,28589}, c1: 32847, c2: 28589" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,8615}, c1: 32847, c2: 8615" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,49518}, c1: 32847, c2: 49518" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,35727}, c1: 32847, c2: 35727" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,2679}, c1: 32847, c2: 2679" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,34267}, c1: 32847, c2: 34267" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,13890}, c1: 32847, c2: 13890" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,20092}, c1: 32847, c2: 20092" received from server process with PID 33062. Asynchronous notification "hello" with payload "level: 2, path: {1,32847,11795}, c1: 32847, c2: 11795" received from server process with PID 33062. 。。。。。。
设计优化1 - 数组现身
用法也很简单,本文就不展示了,数组支持= any(array)的查询,在FUNCTION中支持foreach的循环等。
设计优化2 - 柳暗花明又一春, pgrouting
a(c1, int, c2 int, weight numeric).
1. 数组,用于存储正向关系
2. plpgsql,用于编写推导逻辑,路径搜索逻辑等
3. 游标,用于流式返回
4. 异步消息,用于流式数据返回
4. 聚合查询
5. pgrouting,用于最短路径搜索
6. 递归查询
7. 关于亲密度分析,可以使用类似PostgreSQL rum的插件计算方法,比如计算共同的事件重叠度打分(目前RUM已支持tsvector的近似度值输出和排序,可以用于亲密度计算).
rum参考: 《PostgreSQL 全文检索加速 快到没有朋友 - RUM索引接口(潘多拉魔盒)》
CREATE EXTENSION rum; create table rum1(c1 int, c2 tsvector, primary key (c1)); create index idx_rum1_1 on rum1 using rum(c2 rum_tsvector_ops); vi test.sql \set id random(1,100000000) insert into rum1 select :id, (select to_tsvector(string_agg(((width_bucket(:id,1,100000000,2000)-1)*50000 + (random()*50000)::int)::text, ' ')) from generate_series(1,100)) on conflict do nothing; pgbench -M prepared -n -r -P 5 -f ./test.sql -c 64 -j 64 -T 100000
postgres=# select c1, c2 <=> tsq as dis from rum1, (select to_tsquery(replace(rtrim(ltrim(array_to_tsvector(tsvector_to_array(c2))::text, ''''), ''''), $$' '$$, ' | ')) tsq from rum1 where c1=1) as tmp where c2 @@ tsq and c1<>33233490 order by c2 <=> tsq limit 10;
c1 | dis -------+---------- 1 | 0.164493 42469 | 4.11234 28939 | 5.48311 45740 | 5.48311 15508 | 5.48311 11589 | 5.48311 12377 | 5.48311 34282 | 5.48311 16731 | 5.48311 6474 | 5.48311 (10 rows) Time: 259.834 ms
create or replace function find_rel_pagerank_cur( v_c1 int, -- 需要推导的ID v_level int, -- 推导几级关系 v_rank numeric, -- 亲密度阈值,大于该值的不输出(越大距离越远, 越不亲密) v_limit_perlevel int -- 每一级输出的最大记录数(如人数) ) returns setof record as $$ declare i int := 1; i_c1 int; ref cursor( var_level int, var_c1 int ) for select level,c1,c2,pagerank::numeric from (select var_level as level, var_c1 as c1, c1 as c2, c2 <=> tsq as pagerank from rum1, (select to_tsquery(replace(rtrim(ltrim(array_to_tsvector(tsvector_to_array(c2))::text, ''''), ''''), $_$' '$_$, ' | ')) tsq from rum1 where c1=var_c1) as tmp where c2 @@ tsq and c1<>var_c1 order by c2 <=> tsq limit v_limit_perlevel ) t where t.pagerank < v_rank ; begin if v_level <=0 then raise notice 'level must >=1'; return; end if; -- 9.6还是inner temp table,频繁创建和删除,可能导致catalog产生垃圾,需要注意一下。 -- 用来临时存储从起点开始, 牵扯出来的每一层级的关系 -- 目前plpgsql不支持流式返回, 即使使用return next , return query -- https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html create temp table if not exists tmp2(level int, c1 int, c2 int, pagerank numeric, primary key(c1,c2)) ON COMMIT delete rows; create index if not exists idx_tmp2_1 on tmp2(level, c2); -- 存储初始层级, 即起点的数据 for rec in ref(i,v_c1) loop insert into tmp2 values (rec.level, rec.c1, rec.c2, rec.pagerank) on conflict do nothing; if found then raise notice 'level: %, c1: %, c2:% ,pagerank: %', rec.level, rec.c1, rec.c2, rec.pagerank; return next rec; end if; end loop; loop i := i+1; -- 已找到所有层级的数据 if i > v_level then return; end if; for i_c1 in select t2.c1 from rum1 t2 JOIN tmp2 t1 on (t1.c2=t2.c1 and t1.level=i-1 and t2.c1<>v_c1) where not exists (select 1 from tmp2 where tmp2.c1=t2.c1) group by 1 loop for rec in ref(i,i_c1) loop insert into tmp2 values (rec.level, rec.c1, rec.c2, rec.pagerank) on conflict do nothing; if found then raise notice 'level: %, c1: %, c2:% ,pagerank: %', rec.level, rec.c1, rec.c2, rec.pagerank; return next rec; end if; end loop; end loop; end loop; return; end; $$ language plpgsql strict; postgres=# select * from find_rel_pagerank_cur(96807211,2,10000,10) as t(level int, c1 int, c2 int, pagerank numeric); NOTICE: relation "tmp2" already exists, skipping NOTICE: relation "idx_tmp2_1" already exists, skipping NOTICE: level: 1, c1: 96807211, c2:96810420 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96849305 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96810740 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96839717 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96849378 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96800097 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96832351 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96839438 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96816466 ,pagerank: 5.48311 NOTICE: level: 1, c1: 96807211, c2:96836416 ,pagerank: 5.48311 NOTICE: level: 2, c1: 96800097, c2:96812430 ,pagerank: 4.11234 NOTICE: level: 2, c1: 96800097, c2:96802051 ,pagerank: 5.48311 NOTICE: level: 2, c1: 96800097, c2:96824209 ,pagerank: 5.48311 ......
1. insert into .. returning .. 游标支持流式返回, 目前需要等insert结束才返回returning的结果, 无法同时插入同时return
2. with recursive 递归查询目前仅支持支持work table中间状态的查询,建议支持全量work table查询。
3. with recursive 递归查询,建议支持LOOPth的变量,知道进入第几个循环了。
4. plpgsql return next, return query支持流式返回。 (目前要在FUNCTION中返回流式数据,还只能写C的接口,手册中有指出,未来可能会对plpgsql添加流式返回的支持)
5. 可以增强的点,PostgreSQL的索引接口是完全开放的,索引存储怎么组织,怎么检索,用户都可以自定义。
可以参考rum, bloom index的写法。 https://github.com/postgrespro/rum https://www.postgresql.org/docs/9.6/static/xindex.html https://www.postgresql.org/docs/9.6/static/bloom.html
1. neo4j
2. 基于PG的graph database
3. pgrouting
pgrouting 与双十一的物流路径规划
《聊一聊双十一背后的技术 - 物流, 动态路径规划》
4. PostgreSQL facebook linkbench社交关系模型性能测试
5. 递归查询的几个例子
《distinct xx和count(distinct xx)的变态递归优化方法 - 索引收敛(skip scan)扫描》
《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》
https://hx.tiancebbs.cn/qths/471495.html https://taicang.tiancebbs.cn/hjzl/458351.html https://www.tiancebbs.cn/ershouwang/472578.html https://zulin.tiancebbs.cn/sh/1591.html https://zulin.tiancebbs.cn/sh/316.html https://hf.tiancebbs.cn/xiudiannao/54819.html https://www.tiancebbs.cn/ershoufang/470377.html https://zulin.tiancebbs.cn/sh/3624.html https://www.tiancebbs.cn/ershouwang/474355.html https://aihuishou.tiancebbs.cn/sh/1648.html https://dg.tiancebbs.cn/xiudiannao/58249.html https://www.tiancebbs.cn/ershouwang/472535.html https://aihuishou.tiancebbs.cn/sh/3699.html https://changshushi.tiancebbs.cn/hjzl/462800.html https://www.tiancebbs.cn/ershouwang/474332.html https://www.tiancebbs.cn/ershouwang/474259.html https://taicang.tiancebbs.cn/hjzl/462483.html
http://fuyang.tjtcbmw.cn/jdzr/ http://taiying.njtcbmw.cn/cpq/ http://yz.cqtcxxw.cn/luwansh/ http://shenghuo.china-bbs.com/cpq/ https://nhqshi.tiancebbs.cn/ https://jmtaishan.tiancebbs.cn/ http://shenghuo.china-bbs.com/jinzhong/ http://nalei.zjtcbmw.cn/tjxq/ http://gx.lztcxxw.cn/dingxi/ http://cf.lstcxxw.cn/yspx/ https://daozuo.tiancebbs.cn/ http://taiying.njtcbmw.cn/hbes/ https://duruanzhen.tiancebbs.cn/ https://fenlei.tiancebbs.cn/jdzr/ https://ju.tiancebbs.cn/ https://hepinggongyuan.tiancebbs.cn/ http://ly.shtcxxw.cn/henan/
转正工作总结:https://www.nanss.com/gongzuo/2777.html 奖学金申请书:https://www.nanss.com/xuexi/2742.html 疫情防控知识宣传:https://www.nanss.com/shenghuo/2603.html 小学毕业赠言给老师:https://www.nanss.com/xuexi/2431.html 我的爱好作文:https://www.nanss.com/xuexi/2945.html 毕业总结:https://www.nanss.com/xuexi/2556.html 实习鉴定评语:https://www.nanss.com/gongzuo/2694.html 自我介绍简短有趣:https://www.nanss.com/xuexi/2701.html 护士个人总结:https://www.nanss.com/gongzuo/2574.html 民主评议党员工作总结:https://www.nanss.com/gongzuo/2548.html 疫情期间的暖心话简短:https://www.nanss.com/yulu/2914.html 春天发朋友圈的精美句子:https://www.nanss.com/wenan/2244.html 一件令我感动的事:https://www.nanss.com/xuexi/2962.html 批评与自我批评:https://www.nanss.com/gongzuo/2675.html 简历自我评价:https://www.nanss.com/gongzuo/2682.html 爱情誓言:https://www.nanss.com/yulu/2735.html 防汛工作总结:https://www.nanss.com/gongzuo/2966.html 团员教育评议表自我评价:https://www.nanss.com/xuexi/2563.html 乔迁之喜贺词:https://www.nanss.com/shenghuo/2925.html 劳动仲裁申请书:https://www.nanss.com/gongzuo/2650.html 运动会通讯稿:https://www.nanss.com/xuexi/2918.html 教研组工作总结:https://www.nanss.com/gongzuo/2585.html 自查自纠报告:https://www.nanss.com/gongzuo/2702.html 蚂蚁搬家作文:https://www.nanss.com/xuexi/2957.html 实习单位指导教师评语:https://www.nanss.com/xuexi/2794.html 宣传工作总结:https://www.nanss.com/gongzuo/2977.html 一句最让男人内疚的话:https://www.nanss.com/yulu/2531.html 工作思路:https://www.nanss.com/gongzuo/2616.html 最牛的游戏名字:https://www.nanss.com/mingcheng/2529.html 会议主持词:https://www.nanss.com/gongzuo/2761.html