PostgreSQL中的并行查询 原作者:Nickolay Ihalainen 创作时间:2019-02-28 15:52:37+08 |
redraiment 发布于2019-02-28 15:52:37
![]() ![]() ![]() ![]() ![]() |
作译者简介
原文链接: https://www.percona.com/blog/2019/02/21/parallel-queries-in-postgresql/
作者:Nickolay Ihalainen
Nickolay于2010年12月加入Percona,此前他曾在俄罗斯最受欢迎的电影院工作多年。 在他访问期间,Nickolay和一小组开发人员负责将网站扩展为现在每天为超过一百万的独立访问者提供服务的网站。 在此之前,他曾在其他几家公司工作,包括提供ISP计费和IPTV解决方案的NetUp,以及俄罗斯最古老的电子商务公司eHouse。 Nickolay在系统管理和编程方面拥有丰富的经验。 他的经验包括广泛的实践工作和广泛的技术,包括SQL,MySQL,PHP,C,C ++,Python,Java,XML,OS参数调整(Linux,Solaris),缓存技术(例如,memcached),RAID ,文件系统,SMTP,POP3,Apache,网络和网络数据格式等等。 他是可扩展性,性能和系统可靠性方面的专家。
翻译:张文升
中国开源软件推进联盟PostgreSQL分会核心成员之一,《PostgreSQL实战》作者之一。常年活跃于PostgreSQL、MySQL、Redis等开源技术社区,坚持推动PostgreSQL在中国地区的发展,多次参与组织PostgreSQL全国用户大会。近年来致力于推动PostgreSQL在互联网企业的应用以及企业PostgreSQL培训与技术支持
一、引言
现代CPU的内核数量愈发可观。多年来,应用程序一直在向数据库并行发送查询。如果查询处理许多行数据,使用多核的能力有助于查询更快地执行。PostgreSQL中的并行查询允许我们利用多核来更快地完成报表查询(reporting queries)。并行查询功能在9.6中实现,从PostgreSQL 9.6开始,报表查询能够并行使用多核,高效完成查询。
并行查询执行的初始实现花了三年时间。并行的支持需要在许多查询执行阶段进行代码更改。PostgreSQL 9.6为进一步的代码改进创建了一个基础架构。更高版本则扩展了对其他查询类型的并行执行支持。
二、限制
- 如果所有CPU核心已经饱和,请不要启用并行执行。并行执行会从其他查询中窃取CPU时间,并增加响应时间。
- 最重要的是,并行处理显着增高WORKMEM的内存使用量,因为每个散列连接或排序操作都需要一个WORKMEM内存量。
- 通过并行执行,无法更快地实现低延迟OLTP查询。特别是,当启用并行执行时,返回单个行的查询可能会执行得很糟糕。
- 并行执行仅支持没有锁谓词的SELECT查询。
- 正确的索引可能是并行顺序表扫描的更好替代方案。
- 不支持游标或暂停查询。
- 窗口函数和有序集合函数是非平行的。
- IO绑定工作负载没有任何好处。
- 没有并行排序算法。但是在某些方面,具有排序的查询仍然可以是并行的。
- 用子查询替换CTE(WITH …)以支持并行执行。
- FDW目前不支持并行执行(但它们可以!)
- 不支持FULL OUTER JOIN。
- 设置max_rows的客户端禁用并行执行。
- 如果查询使用未标记为PARALLEL SAFE的函数,则它将是单线程的。
- SERIALIZABLE事务隔离级别禁用并行执行。
三、测试环境
PostgreSQL开发团队试图改进TPC-H基准查询的响应时间。 您可以使用这些说明下载基准并使其适应PostgreSQL。这不是使用TPC-H基准测试的官方方式,因此您不应该使用它来比较不同的数据库或硬件。
- 从官方TPC网站下载TPC-HToolsv2.17.3.zip(或更新版本)。
- 将makefile.suite重命名为Makefile,并按照 https://github.com/tvondra/pg_tpch 的要求对其进行修改。使用make命令编译代码
- 生成数据:
./dbgen -s 10
生成 23GB 数据库,足以看到并行和非并行查询的性能差异。 - 使用
for + sed
将tbl文件转换为csv - 克隆pgtpch存储库并将csv文件复制到
pgtpch/dss/data
- 使用qgen命令生成查询
- 使用
./tpch.sh
命令将数据加载到数据库。
四、并行顺序扫描
这可能更快,不是因为并行读取,而是由于跨越许多CPU核的数据分布。现代操作系统为PostgreSQL数据文件提供了良好的缓存。预读允许从存储中获取块,而不仅仅是PG守护程序请求的块。因此,查询性能不受磁盘IO的限制。它消耗CPU周期:
- 从表数据页中逐个读取行
- 比较行值和WHERE条件
让我们尝试执行简单的选择查询:
tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms
顺序扫描会产生太多行而不会聚合。因此,查询由单个CPU核执行。 添加SUM()之后,很明显可以看到两个worker进程将帮助我们更快地进行查询:
explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms
与普通的单线程选择相比,更复杂的查询速度提高了2.2倍。
并行聚合
“Parallel Seq Scan”节点生成用于部分聚合的行。“Partial Aggregate”节点使用 SUM() 减少这些行。最后,每个worker进程的SUM计数器由“Gather”节点收集。 最终结果由“Finalize Aggregate”节点计算。如果您有自己自定义的聚合函数,请不要忘记将它们标记为“parallel safe”。
worker数量
我们可以不重启服务器的情况下增加worker数量
alter system set max_parallel_workers_per_gather=4;
select * from pg_reload_conf();
现在,从Explain的输出中可见有4个workers节点:
tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------
Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1)
-> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5)
-> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 229267
Planning Time: 0.218 ms
Execution Time: 5153.967 ms
这里发生了什么事?我们已将worker数量从2更改为4,查询速度提高了1.6599倍。实际上提升是明显的,我们有两个worker加一个gather节点。配置更改后,它变为4 + 1。我们可以实现的并行执行的最大提升是:5/3 = 1.66倍。
它是如何工作的?
流程
查询执行始终在查询主进程中启动。主进程执行所有非并行活动及其对并行处理的贡献。执行相同查询的其他进程称为“worker”进程。 并行执行使用Dynamic Background Workers基础结构(在PG 9.4中增加)。由于PostgreSQL的其他部分使用进程而不是线程,因此创建三个工作进程的查询可能比传统执行快4倍。
通讯
worker使用消息队列(基于共享内存)与查询主进程进行通信。每个worker进程都有两个队列:一个用于处理错误,一个用于处理元组。
五、应该使用多少worker?
首先,max_parallel_workers_per_gather
参数是worker数量的最小限制。其次,查询执行程序从max_parallel_workers
大小限制的池中获取worker。最后,最顶层的限制是max_worker_processes
,它用来限制后台进程的总数。 工作分配失败会导致单进程执行。 查询计划可以根据表或索引大小减少worker数。min_parallel_table_scan_size
和min_parallel_index_scan_size
控制这一行为。
set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
每次表比minparallel(index | table)scansize
大3倍时,postgres会添加一个worker。worker数量不是以cost为基础的!循环依赖使得复杂的实现变得困难。相反,查询规划使用简单的规则。
实际上,这些规则在生产中并不总是可以接受,您可以使用ALTER TABLE ... SET(parallel_workers = N)
覆盖特定表的worker数量。
六、为什么不使用并行执行?
除了很长的并行执行限制列表外,PostgreSQL还会检查查询成本:parallel_setup_cost
:以避免短查询的并行执行。它模拟了内存设置,进程启动和初始通信所花费的时间parallel_tuple_cost
:查询主进程和worker之间的通信可能需要很长时间。时间与worker发送的元组数成正比。该参数模拟通信成本。
Nested loop joins
由于操作简单,PostgreSQL 9.6+可以并行执行“Nested loop joins”。
explain (costs off) select c_custkey, count(o_orderkey)
from customer left outer join orders on
c_custkey = o_custkey and o_comment not like '%special%deposits%'
group by c_custkey;
QUERY PLAN
----------------------------------------------------
Finalize GroupAggregate
Group Key: customer.c_custkey
-> Gather Merge
Workers Planned: 4
-> Partial GroupAggregate
Group Key: customer.c_custkey
-> Nested Loop Left Join
-> Parallel Index Only Scan using customer_pkey on customer
-> Index Scan using idx_orders_custkey on orders
Index Cond: (customer.c_custkey = o_custkey)
Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
聚合发生在最后一个阶段,因此“Nested Loop Left Join”是一个并行操作。从版本10开始PostgreSQL提供了“Parallel Index Only Scan”。它的作用方式与并行顺序扫描类似。c_custkey = o_custkey
条件为每个客户行读取单个订单。 因此它并不平行。
Hash Join
在PostgreSQL 11之前每个worker都构建自己的哈希表,4个以上的worker无法线性提高性能。新实现使用共享哈希表。每个worker都可以使用WORK_MEM来构建哈希表。
select
l_shipmode,
sum(case
when o_orderpriority = '1-URGENT'
or o_orderpriority = '2-HIGH'
then 1
else 0
end) as high_line_count,
sum(case
when o_orderpriority <> '1-URGENT'
and o_orderpriority <> '2-HIGH'
then 1
else 0
end) as low_line_count
from
orders,
lineitem
where
o_orderkey = l_orderkey
and l_shipmode in ('MAIL', 'AIR')
and l_commitdate < l_receiptdate
and l_shipdate < l_commitdate
and l_receiptdate >= date '1996-01-01'
and l_receiptdate < date '1996-01-01' + interval '1' year
group by
l_shipmode
order by
l_shipmode
LIMIT 1;
QUERY PLAN
----------------------------------------------------
Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
-> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
Group Key: lineitem.l_shipmode
-> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
Group Key: lineitem.l_shipmode
-> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
Sort Key: lineitem.l_shipmode
Sort Method: external merge Disk: 2304kB
Worker 0: Sort Method: external merge Disk: 2064kB
Worker 1: Sort Method: external merge Disk: 2384kB
Worker 2: Sort Method: external merge Disk: 2264kB
Worker 3: Sort Method: external merge Disk: 2336kB
-> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
Rows Removed by Filter: 11934691
-> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
Buckets: 65536 Batches: 256 Memory Usage: 3840kB
-> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
Planning Time: 0.977 ms
Execution Time: 7923.770 ms
来自TPC-H的Query 12是并行hash join很好的示例,每个worker都有助于构建共享哈希表。
Merge Join
由于merge join的性质,不可能使其并行。如果它是查询执行的最后一个阶段,请不要担心 - 您仍然可以看到具有merge join的查询的并行执行。
-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from part, supplier, partsupp, nation, region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and p_size = 36
and p_type like '%BRASS'
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'AMERICA'
and ps_supplycost = (
select
min(ps_supplycost)
from partsupp, supplier, nation, region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'AMERICA'
)
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
QUERY PLAN
----------------------------------------------------
Limit
-> Sort
Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
-> Merge Join
Merge Cond: (part.p_partkey = partsupp.ps_partkey)
Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
-> Gather Merge
Workers Planned: 4
-> Parallel Index Scan using <strong>part_pkey</strong> on part
Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
-> Materialize
-> Sort
Sort Key: partsupp.ps_partkey
-> Nested Loop
-> Nested Loop
Join Filter: (nation.n_regionkey = region.r_regionkey)
-> Seq Scan on region
Filter: (r_name = 'AMERICA'::bpchar)
-> Hash Join
Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
-> Seq Scan on supplier
-> Hash
-> Seq Scan on nation
-> Index Scan using idx_partsupp_suppkey on partsupp
Index Cond: (ps_suppkey = supplier.s_suppkey)
SubPlan 1
-> Aggregate
-> Nested Loop
Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
-> Seq Scan on region region_1
Filter: (r_name = 'AMERICA'::bpchar)
-> Nested Loop
-> Nested Loop
-> Index Scan using idx_partsupp_partkey on partsupp partsupp_1
Index Cond: (part.p_partkey = ps_partkey)
-> Index Scan using supplier_pkey on supplier supplier_1
Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
-> Index Scan using nation_pkey on nation nation_1
Index Cond: (n_nationkey = supplier_1.s_nationkey)
“Merge Join”节点位于“Gather Merge”上方。因此合并不使用并行执行。 但“Parallel Index Scan”节点仍然有助于part_pkey段。
Partition-wise join
PostgreSQL 11默认禁用partition-wise join功能。partition-wise join具有较高的计划成本,类似分区表的连接可以逐个分区完成,这允许postgres使用较小的哈希表.每个分区连接操作可以并行执行。
tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
QUERY PLAN
----------------------------------------------------
Append
-> Hash Join
Hash Cond: (t2.b = t1.a)
-> Seq Scan on prt2_p1 t2
Filter: ((b >= 0) AND (b <= 10000))
-> Hash
-> Seq Scan on prt1_p1 t1
Filter: (b = 0)
-> Hash Join
Hash Cond: (t2_1.b = t1_1.a)
-> Seq Scan on prt2_p2 t2_1
Filter: ((b >= 0) AND (b <= 10000))
-> Hash
-> Seq Scan on prt1_p2 t1_1
Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
QUERY PLAN
----------------------------------------------------
Gather
Workers Planned: 4
-> Parallel Append
-> Parallel Hash Join
Hash Cond: (t2_1.b = t1_1.a)
-> Parallel Seq Scan on prt2_p2 t2_1
Filter: ((b >= 0) AND (b <= 10000))
-> Parallel Hash
-> Parallel Seq Scan on prt1_p2 t1_1
Filter: (b = 0)
-> Parallel Hash Join
Hash Cond: (t2.b = t1.a)
-> Parallel Seq Scan on prt2_p1 t2
Filter: ((b >= 0) AND (b <= 10000))
-> Parallel Hash
-> Parallel Seq Scan on prt1_p1 t1
Filter: (b = 0)
最重要的是,只有当分区足够大时,partition-wise join才能使用并行执行。
Parallel Append
Parallel append分区工作,而不是在不同的worker程序中使用不同的块。通常,您可以使用UNION ALL查询来查看此内容。缺点 - 较少的并行性,因为每个worker最终都可以为单个查询工作。
即使配置使用了四个worker,也只能启动两个worker。
tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------
Gather
Workers Planned: 2
-> Parallel Append
-> Aggregate
-> Seq Scan on lineitem
Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
-> Aggregate
-> Seq Scan on lineitem lineitem_1
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
最重要的变量
- WORKMEM限制每个进程的内存使用量!不仅仅是查询:
workmem * processes * joins =>
可能导致大量内存使用。 - maxparallelworkerspergather - 执行程序将用于并行执行计划程序节点的worker程序数
- maxworkerprocesses - 根据服务器上安装的CPU核心数量调整worker总数
- maxparallelworkers - 并行worker数量
七、摘要
从PG 9.6开始执行并行查询可以显着提高扫描许多行或索引记录的复杂查询的性能。在PostgreSQL 10中,默认情况下启用了并行执行。不要忘记在具有大量OLTP工作负载的服务器上禁用并行执行。顺序扫描或索引扫描仍然消耗大量资源。如果您没有针对整个数据集运行reporting query,则可以通过添加缺失索引或使用适当的分区来提高查询性能。
八、参考
- https://www.postgresql.org/docs/11/how-parallel-query-works.html
- https://www.postgresql.org/docs/11/parallel-plans.html
- http://ashutoshpg.blogspot.com/2017/12/partition-wise-joins-divide-and-conquer.html
- http://rhaas.blogspot.com/2016/04/postgresql-96-with-parallel-query-vs.html
- http://amitkapila16.blogspot.com/2015/11/parallel-sequential-scans-in-play.html
- https://write-skew.blogspot.com/2018/01/parallel-hash-for-postgresql.html
- http://rhaas.blogspot.com/2017/03/parallel-query-v2.html
- https://blog.2ndquadrant.com/parallel-monster-benchmark/
- https://blog.2ndquadrant.com/parallel-aggregate/
- https://www.depesz.com/2018/02/12/waiting-for-postgresql-11-support-parallel-btree-index-builds/ PostgreSQL 11中的并行性
图片由Nathan Gonthier和Pavel Nekoranec在Unsplash上的照片编辑而成。
九、《PostgreSQL实战》推荐
最后推荐和谭峰共同编写的《PostgreSQL实战》,本书基于PostgreSQL 10 编写,共18章,重点介绍SQL高级特性、并行查询、分区表、物理复制、逻辑复制、备份恢复、高可用、性能优化、PostGIS等,涵盖大量实战用例!
链接:https://item.jd.com/12405774.html
请在登录后发表评论,否则无法保存。