摘要

总所周知,阿里云的 PostgreSQL 和 HybridDB for PostgreSQL 和 oss 是全面互通的。HybridDB for PostgreSQL 由于是 MPP 架构天生包括多个计算节点,能够以为并发的方式读写 oss 上的数据。PostgreSQL 在这方面要差一点,默认情况下只能单进程读写 OSS,不过通过 dblink 的加持,我们也能让 OSS 中的数据快速装载到 PostgreSQL。本文就给大家讲讲这其中的黑科技。

一.准备工作

首先,创建我们要用到的插件。

create extension dblink;
create extension oss_fdw;

二.创建异步化存储过程

-- 异步数据装载的准备工作
CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_prepare(t_from text, t_to text)
  RETURNS bool AS
$BODY$
DECLARE
  t_exist  int;
  curs1 refcursor;
  r  record;
  filepath text;
  fileindex int8;
  s1 text;
  s2 text;
  s3 text;
  c int = 0;
  s4 text;
  s5 text;
  ss4 text;
  ss5 text;
  sql text;
BEGIN
  create table if not exists oss_fdw_load_status(id BIGSERIAL primary key, filename text, size int8, rows int8 default 0, status int default 0);
​
  select count(*) into t_exist from oss_fdw_load_status;
​
  if t_exist != 0 then
    RAISE NOTICE 'oss_fdw_load_status not empty';
    return false;
  end if;
​
  -- 通过 oss_fdw_list_file 函数,把外部表 t_from 匹配的 OSS 中的文件列到表中
  insert into oss_fdw_load_status (filename, size) select name,size from oss_fdw_list_file(t_from) order by size desc;
​
  select count(*) into t_exist from oss_fdw_load_status;
  if t_exist = 0 then
    RAISE NOTICE 'oss_fdw_load_status empty,not task found';
    return false;
  end if;
​
  return true;
END;
$BODY$
  LANGUAGE plpgsql;
​
-- 数据装载的工作函数
CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_execute(t_from text, t_to text, num_work int, pass text)
  RETURNS bool AS
$BODY$
DECLARE
  t_exist  int;
  curs1 refcursor;
  r  record;
  filepath text;
  fileindex int8;
  s1 text;
  s2 text;
  s3 text;
  c int = 0;
  s4 text;
  s5 text;
  ss4 text;
  ss5 text;
  sql text;
  db text;
  user text;
BEGIN
  select count(*) into t_exist from oss_fdw_load_status;
  if t_exist = 0 then
    RAISE NOTICE 'oss_fdw_load_status empty';
    return false;
  end if;
​
  s4 = 'oss_loader';
  s5 = 'idle';
  ss4 = '''' || s4 ||'''';
  ss5 = '''' || s5 ||'''';
  sql = 'select count(*) from pg_stat_activity where application_name = ' || ss4 || ' and state != ' || ss5;
​
  select current_database() into db;
  select current_user into user;
​
  -- 通过游标,不断获取单个任务
  OPEN curs1 FOR SELECT id, filename FROM oss_fdw_load_status order by id;
  loop
    fetch curs1 into r;
    if not found then
      exit;
    end if;
    fileindex = r.id;
    filepath = r.filename;
​
    s1 = '''' || t_from ||'''';
    s2 = '''' || t_to ||'''';
    s3 = '''' || filepath ||'''';
​
    LOOP
      -- 查看当前正在工作的任务数,过达到并发数就在这里等待
      select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);
      IF c < num_work THEN
        EXIT;
      END IF;
      RAISE NOTICE 'current runing % loader', c;
      perform pg_sleep(1);
    END LOOP;
​
    -- 通过 DBLINK 创建异步任务
    perform dis_conn('oss_loader_'||fileindex);
    perform dblink_connect('oss_loader_'||fileindex, 'dbname='||db ||' user='||user || ' application_name=oss_loader' || ' password='||pass);
    perform dblink_send_query('oss_loader_'||fileindex, format('
      begin;
      select rds_oss_fdw_load_single_file(%s,%s,%s,%s);
      end;'
      , fileindex, s1, s2, s3)
    );
    RAISE NOTICE 'runing loader task % filename %',fileindex, filepath;
  end loop;
  close curs1;
​
  -- 任务分配完成,等待所有任务完成
  LOOP
    select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);
    IF c = 0 THEN
      EXIT;
    END IF;
    RAISE NOTICE 'current runing % loader', c;
    perform pg_sleep(1);
  END LOOP;
​
  return true;
END;
$BODY$
  LANGUAGE plpgsql;
​
-- 单个文件的数据装在函数
CREATE OR REPLACE FUNCTION rds_oss_fdw_load_single_file(taskid int8, t_from text, t_to text, filepath text)
  RETURNS void AS
$BODY$
DECLARE
  rowscount int8 = 0;
  current text;
  sql text;
BEGIN
  -- 配置 GUC 参数,指定要导入的 OSS 上的文件
  perform set_config('oss_fdw.rds_read_one_file',filepath,true);
  select current_setting('oss_fdw.rds_read_one_file') into current;
  RAISE NOTICE 'begin load %', current;
​
  -- 通过动态 SQL 导入数据
  EXECUTE 'insert into '|| t_to || ' select * from ' || t_from;
  GET DIAGNOSTICS rowscount = ROW_COUNT;
​
  -- 导入完成后,把结果保存到状态表中
  RAISE NOTICE 'end load id % % to % % rows', taskid, filepath, t_to, rowscount;
  update oss_fdw_load_status set rows = rowscount,status = 1 where id = taskid;
  return;
​
EXCEPTION
  when others then
  RAISE 'run rds_oss_fdw_load_single_file with error';
END;
$BODY$
  LANGUAGE plpgsql;
​
-- 关闭连接不报错
create or replace function dis_conn(name) returns void as $$  
declare  
begin  
  perform dblink_disconnect($1);  
  return;  
exception when others then  
  return;  
end;  
$$ language plpgsql strict;

三.使用函数装载数据

准备数据

select rds_oss_fdw_load_data_prepare('oss_table','lineitem');

执行后,会看到表 oss_fdw_load_status 中,保存了准备导入的所有文件列表,用户可以做适当的删减定制。

2. 数据装载

select rds_oss_fdw_load_data_execute('oss_table','lineitem',10,'mypassword');

函数 rds_oss_fdw_load_data_execute 会等待数据导入的完成才返回。

3. 查询状态

期间,我们可以通过下列 SQL 查看正在工作的异步会话状态

select application_name, state, pid,query, now() - xact_start as xact  from pg_stat_activity where state != 'idle' and application_name='oss_loader' order by xact desc;

4.管理状态

同时,我们也可以随时中断数据导入工作

select pg_terminate_backend(pid),application_name, state ,query from pg_stat_activity where state != 'idle' and pid != pg_backend_pid() and application_name='oss_loader';

5. 查看进度

我们也很容易看到整个数据装载的进度(单位 MB)

select
(
select sum(size)/1024/1024 as complete from oss_fdw_load_status where status = 1
)a,
(
select sum(size)/1024/1024 as full from oss_fdw_load_status
)b;

6. 性能

使用 TPCC 100GB的数据进行装载测试,耗时 10 分钟,平均 170MB/S

select rds_oss_fdw_load_data_prepare('t_oss2','lineitem');
​
select rds_oss_fdw_load_data_execute('t_oss2','lineitem',10,'123456Zwj');
​
select sum(size)/1024/1024 from oss_fdw_load_status;
      ?column?      
--------------------
 22561.919849395752
(1 row)
​
select pg_size_pretty(pg_relation_size(oid)) from pg_class where relname = 'lineitem';
 pg_size_pretty 
----------------
 101 GB
(1 row)

总结

本文使用 plsql + dblink 的方式加速了 OSS 的数据导入。另外,大家也可以关注到以下三点

1. PostgreSQL 默认的过程语言 pl/pgsql 相当好用,和 SQL 引擎紧密结合且学习成本低。我们推荐用户把业务逻辑用它实现。使用过程语言相对于在客户端执行 SQL,消除了服务器到和客户端的网络开销,有天然的性能优势。

2. dblink 的异步接口非常适合做性能加速,且和过程语言紧密结合。推荐在 SQL 和 过程语言中使用。

3. 阿里云开发的 oss_fdw 能在 PostgreSQL 和 OSS 之间做快速的数据交换。oss_fdw 支持 CSV 和压缩方式 CSV 数据的读和写,且很容易用并行加速。oss_fdw 的性能相对于 jdbc insert 和 copy 有压倒的性能优势。

PostgreSQL中文社区欢迎广大技术人员投稿

投稿邮箱:press@postgres.cn

CENTER_PostgreSQL_Community

请在登录后发表评论,否则无法保存。
1楼 xcvxcvsdf
2024-10-27 09:22:33+08

https://zulin.tiancebbs.cn/sh/1599.html https://nujiang.tiancebbs.cn/qths/471027.html https://taicang.tiancebbs.cn/hjzl/461165.html https://taicang.tiancebbs.cn/hjzl/459809.html https://aihuishou.tiancebbs.cn/sh/3406.html https://sh.tiancebbs.cn/hjzl/470489.html https://wujiangqu.tiancebbs.cn/jiajiao/213036.html https://sh.tiancebbs.cn/hjzl/471452.html https://www.tiancebbs.cn/gguandao/57989.html https://aihuishou.tiancebbs.cn/sh/341.html https://aihuishou.tiancebbs.cn/sh/4565.html https://ys.tiancebbs.cn/qths/474199.html https://aihuishou.tiancebbs.cn/sh/3271.html https://zulin.tiancebbs.cn/sh/4757.html https://changshushi.tiancebbs.cn/hjzl/464014.html https://su.tiancebbs.cn/hjzl/462145.html https://zulin.tiancebbs.cn/sh/3833.html

2楼 fupeng
2021-05-12 17:43:12+08

大善

© 2010 PostgreSQL中文社区