大数据组件(四)快速入门实时数据湖存储系统ApachePaimon(2)
- IT业界
- 2025-08-28 09:45:02

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)
我们上次已经了解了Paimon的下载及安装,并且了解了主键表的引擎以及changelog-producer的含义
大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1)今天,我们继续快速了解下最近比较火的Apache Paimon:
官方文档: paimon.apache.org/docs/1.0/推荐阅读:当流计算邂逅数据湖:Paimon 的前生今世 1 利用Paimon做维表join 在流式处理中,Lookup Join用来从另一个表(Paimon)中“查字典”来补充流的信息。一个表需要有时间标记(处理时间属性),另一个表需要支持快速查找(查找源连接器)Paimon在Flink中支持对带有主键的表和追加表进行Lookup Join操作 1.1 常规Lookup SET 'execution.runtime-mode' = 'streaming'; -- 1、用户维表 CREATE TABLE customers ( id INT PRIMARY KEY NOT ENFORCED, name STRING comment '姓名', country STRING comment '城市', zip STRING comment '邮编' ) WITH ( 'connector' = 'paimon' ); -- 插入维表数据 INSERT INTO customers VALUES (1,'tom','伦敦','123') ,(2,'hank','纽约','456') ,(3,'小明','北京','789'); -- 2、模拟订单表(数据流) drop TEMPORARY TABLE orders_info; CREATE TEMPORARY TABLE orders_info ( order_id INT, total INT, customer_id INT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.order_id.kind'='sequence', 'fields.order_id.start'='1', 'fields.order_id.end'='1000000', 'fields.total.kind'='random', 'fields.total.min'='1', 'fields.total.max'='1000', 'fields.customer_id.kind'='random', 'fields.customer_id.min'='1', 'fields.customer_id.max'='3' ); -- 3、维表join SELECT o.order_id , o.total , c.country , c.zip FROM orders_info AS o JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; -- 可以看到下面的结果 +----+----------+----------+-------------+---------+ | op | order_id | total | country | zip | +----+----------+----------+-------------+---------+ | +I | 1 | 179 | 纽约 | 456 | | +I | 2 | 31 | 北京 | 789 | | +I | 3 | 148 | 北京 | 789 | | +I | 4 | 774 | 北京 | 789 | 1.2 同步重试Lookup 如果用户维表(查找表)的数据未准备好,导致订单表(主表)的记录无法完成连接,你可以考虑使用Flink的延迟重试策略进行查找。这个功能仅适用于Flink 1.16及以上的版本。下面sql的提示(hints)设置了重试策略: 'table'='c': 指定了应用查找重试策略的目标表的别名,在这个例子中是customers表,其别名为c。'retry-predicate'='lookup_miss': 定义了触发重试的条件。这里的条件是lookup_miss,意味着如果在查找过程中没有找到对应的数据(即查找缺失),就会触发重试机制。'retry-strategy'='fixed_delay': 设置了重试的策略为固定延迟,即每次重试之间会有固定的等待时间。'fixed-delay'='1s': 当采用固定延迟重试策略时,这里设定了每次重试前等待的时间长度为1秒。'max-attempts'='600': 设定最大重试次数为600次。结合上面的fixed-delay设置,这意味着系统会每隔1秒尝试一次数据查找,最多尝试600次。 -- enrich each order with customer information SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */ o.order_id , o.total , c.country , c.zip FROM orders AS o JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; 1.3 异步重试Lookup 在同步重试机制下,如果处理某一条记录时遇到了问题(例如查找失败),系统会按照设定的重试策略反复尝试直到成功或者达到最大重试次数。在这期间,这条记录后面的其他记录即使没有问题也无法被处理,因为整个处理流程被阻塞了。这会导致数据处理的整体效率低下,甚至可能造成作业延迟或超时。使用异步加上allow_unordered,可以在某些记录在查找时缺失也不会再阻塞其他记录。下面sql配置项解释: 'output-mode'='allow_unordered': 设置输出模式允许无序,意味着当查找失败时不会阻塞其他记录的处理,适合于不需要严格顺序的场景。'lookup.async'='true': 启用异步查找,提高效率,避免因等待某个查找结果而阻塞其它查找操作。'lookup.async-thread-number'='16': 设置用于异步查找的线程数为16,这可以加速查找过程,特别是在高并发情况下。 注意:如果主表(orders)是CDC流,allow_unordered会被Flink SQL忽略(只支持追加流),流作业可能会被阻塞。可以尝试使用Paimon的audit_log系统表功能来绕过这个问题(将CDC流转为追加流)。 SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */ o.order_id , o.total , c.country , c.zip FROM orders AS o JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */ FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; 1.4 max_pt功能在传统的数据仓库中,每个分区通常维护最新的完整数据,因此这种分区表只需要连接最新的分区即可。Paimon特别为此场景开发了max_pt功能。
通过max_pt,Lookup节点能够自动刷新并查询最新分区的数据,确保所使用的客户信息始终是最新的,而不需要手动干预来确定或切换到最新的数据分区。
这种方法特别适用于需要频繁更新和查询最新数据的场景,可以大大提高数据处理的效率和准确性。
-- 1、分区用户维表 drop table if exists customers; CREATE TABLE customers ( id INT, name STRING, country STRING, zip STRING, dt STRING, PRIMARY KEY (id, dt) NOT ENFORCED ) PARTITIONED BY (dt); -- 插入数据(维表关联时候,只查找'2025-02-19'分区数据) INSERT INTO customers VALUES (1, 'Alice', 'USA', '10001', '2025-02-18'), (2, 'Bob', 'UK', '20002', '2025-02-18'), (3, 'Charlie', 'Germany', '30003', '2025-02-18'), (1, 'Alice', 'USA', '10002', '2025-02-19'), -- 更新了 Alice 的邮编 (4, 'David', 'France', '40004', '2025-02-19'); -- 2、订单信息表(解析kafka数据) CREATE TEMPORARY TABLE orders ( order_id BIGINT, customer_id INT, total DECIMAL(10, 2), proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'properties.group.id' = 'testordersGroup', 'scan.startup.mode' = 'earliest-offset', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); -- 创建topic /opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders_topic --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 -- 启动命令行生产者,生产数据 /opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders_topic --bootstrap-server centos01:9092 {"order_id":1001,"customer_id":1,"total":50.0} {"order_id":1002,"customer_id":2,"total":30.0} {"order_id":1004,"customer_id":4,"total":20.0} -- 'lookup.dynamic-partition'='max_pt()': 这个选项指示查找节点自动定位并使用最新(最大)的分区。max_pt()函数帮助系统识别最新的分区,确保只查询最新的数据。 -- 'lookup.dynamic-partition.refresh-interval'='1 h': 设置了查找节点刷新最新分区的时间间隔为1小时。这意味着系统会每隔1小时检查一次是否有新的分区可用,并自动更新到最新分区的数据。 SELECT o.order_id , o.total , c.country , c.zip FROM orders AS o JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */ FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; +----+----------------------+--------------+--------------------------------+--------------------------------+ | op | order_id | total | country | zip | +----+----------------------+--------------+--------------------------------+--------------------------------+ | +I | 1001 | 50.00 | USA | 10002 | | +I | 1004 | 20.00 | France | 40004 |注:
可以运行一个Flink流式作业来启动针对该paimon维表的查询服务。当QueryService存在时,Flink查找连接(Lookup Join)会优先从该服务获取数据,这将有效提高查询性能。
可以通过调用sys.query_service系统函数来实现: CALL sys.query_service('paimon_db.customers', 4); -- 设置并行度为4或者通过下面action包开启
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-1.0.0.jar \ query_service \ --warehouse <warehouse-path> \ --database <database-name> \ --table <table-name> \ [--parallelism <parallelism>] \ [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]查询服务能够在内存中缓存频繁访问的数据,并以高并发的方式提供这些数据,减少了磁盘I/O操作和网络延迟的影响。
2 集成Mysql CDC可以通过 Flink SQL 或者 Flink DataStream API 将 Flink CDC 数据写入 Paimon 中,也可以通过Paimon 提供的 CDC 工具来完成入湖。那这两种方式有什么区别呢?
上图是使用 Flink SQL 来完成入湖,简单,但是当源表添加新列后,同步作业不会同步新的列,下游 Paimon 表也不会增加新列。
上图是使用 Paimon CDC 工具来同步数据,可以看到,当源表发生列的新增后,流作业会自动新增列的同步,并传导到下游的 Paimon 表中,完成 Schema Evolution 的同步。
Paimon CDC 工具也提供了整库同步:
一个作业同步多张表,以低成本的方式同步大量小表作业里同时自动进行 Schema Evolution新表将会被自动进行同步,你不用重启作业,全自动完成推荐阅读:Flink + Paimon 数据 CDC 入湖最佳实践
2.1 MySQL一张表同步到Paimon一张表 2.1.1 环境准备 # mysql-cdc相关jar包的下载地址,同时还需要mysql-connector repo.maven.apache.org/maven2/org/apache/flink/flink-connector-mysql-cdc/3.1.1/ repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.1/ repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-common/3.1.1/ # 在flink的lib目录添加下面的jar包 [root@centos01 lib]# ll /opt/apps/flink-1.16.0/lib/ -rw-r--r--. 1 root root 259756 Feb 19 15:13 flink-cdc-common-3.1.1.jar -rw-r--r--. 1 root root 21286022 Feb 19 11:40 flink-cdc-pipeline-connector-mysql-3.1.1.jar -rw-r--r--. 1 root root 388680 Feb 19 10:57 flink-connector-mysql-cdc-3.1.1.jar -rw-r--r--. 1 root root 2475087 Feb 19 10:58 mysql-connector-java-8.0.27.jar ...... # 开启MySQL Binlog并重启MySQL [root@centos01 ~]# vim /etc/my f #添加如下配置信息,开启`cdc_db`数据库的Binlog #数据库id server-id = 1 ##启动binlog,该参数的值会作为binlog的文件名 log-bin=mysql-bin #binlog类型 binlog_format=row ##启用binlog的数据库,需根据实际情况作出修改 binlog-do-db=cdc_db [root@centos01 ~]# systemctl restart mysqld # 启动hadoop和yarn [root@centos01 ~]# start-all.sh # 启动Hive [root@centos01 ~]# nohup hive --service metastore 2>&1 & [root@centos01 ~]# nohup hive --service hiveserver2 2>&1 & # yarn-session模式 # http://centos01:8088/cluster中可以看到Flink session cluster的job ID [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/yarn-session.sh -d # 启动Flink的sql-client [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/sql-client.sh -s yarn-session 使用Hive的元数据 Flink SQL> CREATE CATALOG paimon_hive_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://centos01:9083' ); Flink SQL> USE CATALOG paimon_hive_catalog; Flink SQL> create database if not exists paimon_db; Flink SQL> use paimon_db; # 设置显示模式 Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; Flink SQL> SET 'execution.runtime-mode' = 'batch'; 2.1.2 案例详解 如果指定的Paimon表不存在,将自动创建表。其schema将从所有指定的MySQL表派生;如果Paimon 表已存在,则其schema将与所有指定MySQL表的schema进行比较;仅支持同步具有主键的MySQL表;也可MySQL多张表同步到Paimon一张表,可以查看官网示例。 -- 准备测试数据 DROP TABLE IF EXISTS `user_info`; CREATE TABLE `user_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `login_name` varchar(200) DEFAULT NULL COMMENT '用户名称', `name` varchar(200) DEFAULT NULL COMMENT '用户姓名', `phone_num` varchar(200) DEFAULT NULL COMMENT '手机号', `birthday` date DEFAULT NULL COMMENT '用户生日', `gender` varchar(1) DEFAULT NULL COMMENT '性别 M男,F女', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `operate_time` datetime DEFAULT NULL COMMENT '修改时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC; -- ---------------------------- -- Records of user_info -- ---------------------------- INSERT INTO `user_info`(`login_name`, `name`, `phone_num`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES ('zhangsan', '张三', '13800001234', '1990-01-01', 'M', NOW(), NOW()), ('lisi', '李四', '13800005678', '1992-02-02', 'F', NOW(), NOW()), ('wangwu', '王五', '13800009012', '1995-03-03', 'M', NOW(), NOW()), ('zhaoliu', '赵六', '13800003456', '1998-04-04', 'F', NOW(), NOW()), ('sunqi', '孙七', '13800007890', '2000-05-05', 'M', NOW(), NOW()); CREATE TABLE `orders` ( `order_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单编号', `user_id` bigint(20) NOT NULL COMMENT '用户编号', `order_date` datetime DEFAULT NULL COMMENT '订单日期', `total_price` decimal(10,2) DEFAULT NULL COMMENT '订单总价', PRIMARY KEY (`order_id`) USING BTREE, KEY `idx_user_id` (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC; INSERT INTO `orders`(`user_id`, `order_date`, `total_price`) VALUES (1001, '2025-02-18 14:32:10', 199.99), (1002, '2025-02-19 09:15:30', 299.50), (1001, '2025-02-19 12:47:05', 79.99), (1003, '2025-02-19 13:00:00', 499.00), (1004, '2025-02-20 08:20:25', 159.99); # 按照天进行分区 [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \ /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \ mysql-sync-table \ --warehouse hdfs://centos01:8020/user/hive/warehouse \ --database paimon_db \ --table ods_user_info_cdc \ --primary-keys pt,id \ # 指定分区键,分区键是通过函数转换而来 --partition_keys pt \ --computed_column 'pt=date_format(operate_time, yyyyMMdd)' \ --mysql-conf hostname=centos01 \ --mysql-conf username=root \ --mysql-conf password=123456 \ --mysql-conf database-name=cdc_db \ --mysql-conf table-name='user_info' \ --catalog-conf metastore=hive \ --catalog-conf uri=thrift://centos01:9083 \ --table-conf bucket=2 \ --table-conf changelog-producer=input \ --table-conf sink.parallelism=2 我们可以进行测试了 -- 此时会自动创建paimon表(无需我们手动建表了) Flink SQL> select * from ods_user_info_cdc; +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+ | id | login_name | name | phone_num | birthday | gender | create_time | operate_time | pt | +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+ | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | | 1 | zhangsan | 张三 | 13800001234 | 1990-01-01 | M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+ -- 修改原始表的schema mysql> ALTER TABLE user_info ADD COLUMN email varchar(255); mysql> INSERT INTO `user_info`(`login_name`, `name`, `phone_num`, `birthday`, `gender`, `create_time`, `operate_time`, `email`) values('hank', '汉克', '18600007890', '2000-05-05', 'M', NOW(), NOW(), 'hank@163 '); -- 注意:我们此时在原始sql-client窗口查询,`email`字段并没有加上 Flink SQL> select * from ods_user_info_cdc; +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+ | id | login_name | name | phone_num | birthday | gender | create_time | operate_time | pt | +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+ | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 1 | zhangsan | 张三 | 13800001234 | 1990-01-01 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 6 | hank | 汉克 | 18600007890 | 2000-05-05 | M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 | +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+ -- 其实,此时的schema已经改变,我们查看hdfs上的schema信息会发现`email`字实际上已经添加 [root@centos01 ~]# hdfs dfs -cat /user/hive/warehouse/paimon_db.db/ods_user_info_cdc/schema/schema-1 -- 因此,我们需要另外重新启动一个sql-client窗口进行查询,可以发现是正确的 Flink SQL> select * from ods_user_info_cdc; +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+ | id | login_name | name | phone_num | birthday | gender | create_time | operate_time | pt | email | +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+ | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | <NULL> | | 1 | zhangsan | 张三 | 13800001234 | 1990-01-01 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | <NULL> | | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | <NULL> | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | <NULL> | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | <NULL> | | 6 | hank | 汉克 | 18600007890 | 2000-05-05 | M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 | hank@163 | +----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+ 2.2 整库同步 通过官方提供的paimon-action的jar包可以很方便的将 MySQL、Kafka、Mongo等中的数据实时摄入到Paimon中使用paimon-flink-action,采用mysql-sync-database(整库同步),并通过"–including_tables"参数选择要同步的表这种同步模式有效地节省了大量资源开销,相比每个表启动一个 Flink 任务而言,避免了资源的大量浪费。 [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \ /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \ mysql-sync-database \ --warehouse hdfs://centos01:8020/user/hive/warehouse \ --database paimon_db \ --table-prefix "ods_" \ --table-suffix "_mysql_cdc" \ --mysql-conf hostname=centos01 \ --mysql-conf username=root \ --mysql-conf password=123456 \ --mysql-conf database-name=cdc_db \ --catalog-conf metastore=hive \ --catalog-conf uri=thrift://centos01:9083 \ --table-conf bucket=2 \ --table-conf changelog-producer=input \ --table-conf sink.parallelism=2 \ --including-tables 'user_info|orders' -- 在一个flink任务中同步多个mysql表 -- 在paimon中会自动创建多张表 Flink SQL> select * from ods_orders_mysql_cdc; Flink SQL> select * from ods_user_info_mysql_cdc;大数据组件(四)快速入门实时数据湖存储系统ApachePaimon(2)由讯客互联IT业界栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“大数据组件(四)快速入门实时数据湖存储系统ApachePaimon(2)”