作者介绍

Lottu(乐途),就职于深圳宜搜科技有限公司,担任数据库DBA,主要承PostgreSQL、Oracle数据库维护工作以及数据库去O工作。

背景

Kafka是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是流数据处理中间件的事实标准。当通过Kafka和greenplum搭建流处理管道时,如何高速可靠的完成流数据加载,成为用户最关心的问题。从5.10开始,Greenplum发布了新的工具GPKafka,为Greenplum提供了流数据加载的能力。

GPkafka工具:kafka ---> Greenplum

一、安装准备

kafka安装:版本为 kafka_2.11-2.1.0。 greenplum安装:版本为 5.16

二、Kafka数据导入GreenPlum

1.启动kafka

# 启动zookeeper

$ /opt/zookeeper-3.4.12/bin/zkServer.sh start

# 启动kafka

$ /opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon ../config/server.properties

2.创建gpss扩展 在将Kafka消息数据加载到Greenplum数据库之前,必须在将Kafka数据写入Greenplum表的每个数据库中注册Greenplum-Kafka集成格式化程序函数;示例在lottu数据库

[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.
lottu=# CREATE EXTENSION gpss;

3.创建示例表 kafka的数据格式json形式;样式:

{
    "time":1550198435941,
    "type":"type_mobileinfo",
    "phone_imei":"861738033581011",
    "phone_imsi":"",
    "phone_mac":"00:27:1c:95:47:09",
    "appkey":"307A5C626E6C2F6472636E6E6A2F736460656473",
    "phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2",
    "phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9",
    "appUdid":"D21C76419E54B18DDBB94BF2E6990183",
    "phone_resolution":"1280*720","phone_apn":"",
    "phone_model":"BF T26",
    "phone_firmware_version":"5.1",
    "phone_softversion":"3.19.0",
    "phone_softname":"com.esbook.reader",
    "sdk_version":"3.1.8","cpid":"blp1375_13621_001",
    "currentnetworktype":"wifi","phone_city":"",
    "os":"android",
    "install_path":"\/data\/app\/com.esbook.reader-1\/base.apk",
    "last_cpid":"",
    "package_name":"com.esbook.reader",
    "src_code":"WIFIMAC:00:27:1c:95:47:09"
}

我需要其中的 package_name,appkey ,time, phone_udid,os, idfa,phone_imei,cpid,last_cpid,phone_number 字段;所以我创建的表语句

CREATE TABLE tbl_novel_mobile_log (
  package_name text,
  appkey text,
  ts bigint,
  phone_udid text,
  os character varying(20),
  idfa character varying(64),
  phone_imei character varying(20),
  cpid text,
  last_cpid text,
  phone_number character varying(20)
) ;

4.创建gpkafka.yaml配置文件

gpkafka_mobile_yaml文件内容:
DATABASE: lottu
USER: gpadmin
HOST: oracle166
PORT: 5432
KAFKA:
  INPUT:
  SOURCE:
    BROKERS: kafkaip:9092
    TOPIC: mobile_info
  COLUMNS:
    - NAME: jdata
      TYPE: json
  FORMAT: json
  ERROR_LIMIT: 10
  OUTPUT:
  TABLE: tbl_novel_mobile_log
  MAPPING:
    - NAME: package_name
      EXPRESSION: (jdata->>'package_name')::text
    - NAME: appkey
      EXPRESSION: (jdata->>'appkey')::text
    - NAME: ts
      EXPRESSION: (jdata->>'time')::bigint
    - NAME: phone_udid
      EXPRESSION: (jdata->>'phone_udid')::text
    - NAME: os
      EXPRESSION: (jdata->>'os')::text
    - NAME: idfa
      EXPRESSION: (jdata->>'idfa')::text
    - NAME: phone_imei
      EXPRESSION: (jdata->>'phone_imei')::text
    - NAME: cpid
      EXPRESSION: (jdata->>'cpid')::text
    - NAME: last_cpid
      EXPRESSION: (jdata->>'last_cpid')::text
    - NAME: phone_number
      EXPRESSION: (jdata->>'phone_number')::text
  COMMIT:
  MAX_ROW: 1000

5.创建mobile_info topic

/opt/kafka/kafka_2.11-2.1.0/bin/kafka-topics.sh --create --zookeeper kafkaIp:2181 --replication-factor 1 --partitions 1  --topic mobile_info

6.创建kafka的发布者

执行下列命令;并添加kafka记录

[root@oracle166 ~]# /opt/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh  --broker-list kafkaIP:9092 --topic mobile_info

>{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BF T26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}
{"time":1550198437885,"type":"type_mobileinfo","phone_imei":"862245038046551","phone_imsi":"","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626F2F76646B74606F2F736460656473","phone_udid":"A3BB70A0218AEFC7908B1D79C0C02D77","phone_udid2":"E3976E0453010FC7F32B6143AA3A164E","appUdid":"4FBEF77BC076254ED0407CAD653E6954","phone_resolution":"1920*1080","phone_apn":"","phone_model":"Le X620","phone_firmware_version":"6.0","phone_softversion":"1.9.0","phone_softname":"cn.wejuan.reader","sdk_version":"3.1.8","cpid":"blf1298_14411_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/cn.wejuan.reader-1\/base.apk","last_cpid":"","package_name":"cn.wejuan.reader","src_code":"ffffffff-9063-8e34-0000-00007efffeff"}
{"time":1550198438311,"type":"type_mobileinfo","phone_number":"","phone_imei":"867520045576831","phone_imsi":"460001122544742","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"A00407EF9D6EBCC207A514CDA452EB76","phone_udid2":"A00407EF9D6EBCC207A514CDA452EB76","appUdid":"1C35633F4EB8218789EFD8666C763485","phone_resolution":"2086*1080","phone_apn":"CMCC","phone_model":"ONEPLUS A6000","phone_firmware_version":"9","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-TlgFCk6ANgEDRnXDCem8uQ==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460001122544742"}
{"time":1550198433102,"type":"type_mobileinfo","phone_number":"15077113477","phone_imei":"860364049874919","phone_imsi":"460023771256711","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"EEF566CB5253AA62B653347A203815C3","phone_udid2":"0845931539AE39B3B0D4EB42B85D98EC","appUdid":"9570DCA2D574E6B69B24137035209D42","phone_resolution":"2340*1080","phone_apn":"CHINA MOBILE","phone_model":"PBEM00","phone_firmware_version":"8.1.0","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-NBToXQo14TOeNuPxo_aA4w==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"13598c2d-efc4-4957-8d4d-22eb145d15fd"}
{"time":1550198440577,"type":"type_mobileinfo","phone_imei":"869800021106037","phone_imsi":"","phone_mac":"2c:5b:b8:fb:79:af","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"2BC16C4AC07070BA9608BBD0EE2EE320","phone_udid2":"A7F9FA4772D31FADEECFDB445BA3BEBB","appUdid":"DC6BEE2F6E5D6A133E26131887AE788A","phone_resolution":"960*540","phone_apn":"","phone_model":"OPPO A33","phone_firmware_version":"5.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_14526_003","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:2c:5b:b8:fb:79:af"}
{"time":1506944701166,"type":"type_mobileinfo","phone_number":"+8618602699126","phone_imei":"865902038154143","phone_imsi":"460012690618403","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"388015DA70C0AEA6D59D3CE37B0C4BA2","phone_udid2":"388015DA70C0AEA6D59D3CE37B0C4BA2","appUdid":"EC0A105297D55075526018078A4A1B84","phone_resolution":"1920*1080","phone_apn":"中国联通","phone_model":"MI MAX 2","phone_firmware_version":"7.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_10928_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460012690618403"}

7.执行gpkafka加载数据

[gpadmin@oracle166 ~]$ gpkafka load --quit-at-eof ./gpkafka_mobile_yaml
PartitionID StartTime EndTime BeginOffset EndOffset
0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5
Job dcd0d159282c0ef39f182cabeef23ee6 stopped normally at 2019-02-27 09:26:29.442874281 +0000 UTC 

8.检查加载操作的进度(非必要)

[gpadmin@oracle166 ~]$ gpkafka check ./gpkafka_mobile_yaml
PartitionID StartTime EndTime BeginOffset EndOffset
0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5

9.查看表数据

[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.

lottu=# select * from tbl_novel_mobile_log ;
   package_name    |                     appkey                     |       ts        |              phone_udid              |   os    | idfa |   phone_imei    |        cpid         | last_cpid | p
hone_number 
-------------------+------------------------------------------+---------------+----------------------------------+---------+------+-----------------+--
 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198435941 | 8F137BFFB2289784A5EA2DCADCE519C2 | android |       | 861738033581011 | blp1375_13621_001 |             | 
 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198438311 | A00407EF9D6EBCC207A514CDA452EB76 | android |       | 867520045576831 | blf1298_12242_001 |             | 
 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198433102 | EEF566CB5253AA62B653347A203815C3 | android |       | 860364049874919 | blf1298_12242_001 |             | 1
5077113477
 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198440577 | 2BC16C4AC07070BA9608BBD0EE2EE320 | android |       | 869800021106037 | blp1375_14526_003 |             | 
 cn.wejuan.reader  | 307A5C626F2F76646B74606F2F736460656473   | 1550198437885 | A3BB70A0218AEFC7908B1D79C0C02D77 | android |       | 862245038046551 | blf1298_14411_001 |             | 
(5 rows)

三、后记

编辑本文初衷是:公司计划为北京ES小说作投放计划,需要类似热云数据平台作为投放数据支持,使投放更加精准可靠。北京小说部门数据存放于kafka中,需要将kafka中的数据导入深圳后台数据库中,虽然最后平台未采用gpkafka方式,但不失一种方案,由于种种原因后台数据库选PG9.6版本,采用java代码实现kafka数据实时导入PG。最后祝PG,GP越来越好,也期待pgkafka工具诞生。

四、参考文献

1、gpkafka更多用法

https://gpdb.docs.pivotal.io/5120/greenplum-kafka/intro.html

2、BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台

https://www.jianshu.com/p/c3659f49bf94

CENTER_PostgreSQL_Community

请在登录后发表评论,否则无法保存。
1楼 xcvxcvsdf
2024-11-13 19:45:33+08

http://ruanwen.xztcxxw.cn/qspjzr/ http://ouyu.hftcbmw.cn/gren/ http://cf.lstcxxw.cn/nanchong/ http://nalei.zjtcbmw.cn/hnzk/ http://cf.lstcxxw.cn/qcpj/ http://cf.lstcxxw.cn/ylxw/ http://gx.lztcxxw.cn/yncx/ https://fuquan.tiancebbs.cn/ https://zhanhe.tiancebbs.cn/ http://nalei.zjtcbmw.cn/fjly/ https://muduzhenwz.tiancebbs.cn/ https://shajiabinzhen.tiancebbs.cn/ http://huaguang.jxtcbmw.cn/tjnk/ http://ouyu.hftcbmw.cn/fang/ http://gx.lztcxxw.cn/jbqz/ http://xinguang.sctcbmw.cn/shaoyang/ http://bjtcxxw.cn/gannan/

2楼 xcvxcvsdf
2024-10-27 08:32:45+08

https://taicang.tiancebbs.cn/hjzl/458211.html https://zulin.tiancebbs.cn/sh/2564.html https://aihuishou.tiancebbs.cn/sh/4469.html https://cq.tiancebbs.cn/qths/469516.html https://taicang.tiancebbs.cn/hjzl/465286.html https://zg.tiancebbs.cn/qths/473196.html https://www.tiancebbs.cn/ershoufang/472758.html https://wuzhou.tiancebbs.cn/qths/459367.html https://su.tiancebbs.cn/hjzl/459428.html https://aihuishou.tiancebbs.cn/sh/4003.html https://sz.tiancebbs.cn/pgjgsc/447576.html https://aihuishou.tiancebbs.cn/sh/528.html https://pds.tiancebbs.cn/qths/472701.html https://zulin.tiancebbs.cn/sh/2914.html https://gy.tiancebbs.cn/qths/463328.html https://nanchong.tiancebbs.cn/qths/474099.html https://zulin.tiancebbs.cn/sh/1143.html

3楼 xcvxcvsdf
2024-10-22 07:44:37+08

https://aihuishou.tiancebbs.cn/sh/2690.html https://aihuishou.tiancebbs.cn/sh/250.html https://zulin.tiancebbs.cn/sh/1700.html https://www.tiancebbs.cn/ershoufang/470869.html https://zulin.tiancebbs.cn/sh/3814.html https://www.tiancebbs.cn/ershouwang/468668.html https://aihuishou.tiancebbs.cn/sh/2443.html https://www.tiancebbs.cn/ershoufang/468296.html https://yb.tiancebbs.cn/qths/466674.html https://ht.tiancebbs.cn/qths/472009.html https://zulin.tiancebbs.cn/sh/1677.html https://sh.tiancebbs.cn/hjzl/463882.html https://aihuishou.tiancebbs.cn/sh/3388.html https://zulin.tiancebbs.cn/sh/2138.html https://taicang.tiancebbs.cn/hjzl/456759.html https://sh.tiancebbs.cn/hjzl/473138.html https://zulin.tiancebbs.cn/sh/4987.html

4楼 xcvxcvsdf
2024-10-16 02:37:56+08

https://www.tiancebbs.cn/ershoufang/469388.html https://zulin.tiancebbs.cn/sh/4730.html https://zulin.tiancebbs.cn/sh/2658.html https://www.tiancebbs.cn/wangzhantuiguang/440808.html https://www.tiancebbs.cn/ershouwang/472526.html https://taicang.tiancebbs.cn/hjzl/465201.html https://aihuishou.tiancebbs.cn/sh/288.html https://www.tiancebbs.cn/ershoufang/474455.html https://aihuishou.tiancebbs.cn/sh/1727.html https://sz.tiancebbs.cn/qtzypx/239172.html https://zulin.tiancebbs.cn/sh/4982.html https://hlbe.tiancebbs.cn/qths/454632.html https://nujiang.tiancebbs.cn/qths/455047.html https://aihuishou.tiancebbs.cn/sh/374.html https://liaoyang.tiancebbs.cn/qths/468263.html https://www.tiancebbs.cn/ershoufang/469892.html https://zulin.tiancebbs.cn/sh/2557.html

5楼 mdcg01
2022-06-03 14:46:52+08

111

6楼 youmocloud
2019-03-08 09:09:43+08

期待pgkafka工具诞生

© 2010 PostgreSQL中文社区