主页 > 软件开发  > 

大数据组件(四)快速入门实时数据湖存储系统ApachePaimon(1)

大数据组件(四)快速入门实时数据湖存储系统ApachePaimon(1)
大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1) Apache Paimon 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。 读/写:Paimon 支持多种读/写数据和执行 OLAP 查询的方式。 对于读取,支持如下三种方式消费数据 历史快照(批处理模式)最新的偏移量(流模式)混合模式下读取增量快照 对于写入,它支持来自数据库变更日志(CDC)的流式同步或来自离线数据的批量插入/覆盖。 生态系统:除了Apache Flink之外,Paimon还支持Apache Hive、Apache Spark、Trino等其他计算引擎的读取。底层存储:Paimon 将列式文件存储在文件系统/对象存储上,并使用 LSM 树结构来支持大量数据更新和高性能查询。Paimon 提供抽象概念的表: 在批处理执行模式下,它就像一个Hive表,支持Batch SQL的各种操作。 查询它以查看最新的快照。在流执行模式下,它的作用就像一个消息队列。 查询它的行为就像从历史数据永不过期的消息队列中查询stream changelog。

今天,我们快速了解下最近比较火的Apache Paimon: 官方文档: paimon.apache.org/docs/1.0/推荐阅读:当流计算邂逅数据湖:Paimon 的前生今世 1 Apache Paimon的下载及安装 1.1 下载及安装 # flink 1.16下载地址 archive.apache.org/dist/flink/flink-1.16.0/ # 相关配置 [root@centos01 ~]# cat /opt/apps/flink-1.16.0/conf/flink-conf.yaml ...... # 解决中文乱码,1.17之前参数是env.java.opts,后面版本是env.java.opts.all env.java.opts: -Dfile.encoding=UTF-8 classloader.check-leaked-classloader: false execution.checkpointing.interval: 10s state.backend: rocksdb state.checkpoints.dir: hdfs://centos01:8020/flink_ck/ckps state.backend.incremental: true # 环境变量配置(需要FLINK_HOME以及HADOOP_CLASSPATH) [root@centos01 ~]# cat /etc/profile export JAVA_HOME=/opt/apps/jdk1.8.0_141 export HADOOP_HOME=/opt/apps/hadoop-3.1.1 export HADOOP_MAPRED_HOME=$HADOOP_HOME export HIVE_HOME=/opt/apps/hive-3.1.2 export SPARK_HOME=/opt/apps/spark-3.3.0 export FLINK_HOME=/opt/apps/flink-1.16.0 export SEATUNNEL_HOME=/opt/apps/seatunnel-2.3.8 export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.5.8 export HBASE_HOME=/opt/apps/hbase-2.2.5 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$FLINK_HOME/bin:$HIVE_HOME/bin:$SEATUNNEL_HOME/bin:$ZOOKEEPER_HOME/bin:$HBASE_HOME/bin::$SPARK_HOME/bin:$SPARK_HOME/sbin export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATH=`hadoop classpath` # paimon与flink集成的jar包下载地址(我这里使用flink 1.16) repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.16/1.0.0/ # action的包下载地址 repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-action/1.0.0/ # 上传paimon的jar包及action包到flink的lib目录 [root@centos01 apps]# mv ./paimon-flink-1.16-1.0.0.jar /opt/apps/flink-1.16.0/lib/ [root@centos01 apps]# mv ./paimon-flink-action-1.0.0.jar /opt/apps/flink-1.16.0/lib/ # 这里又添加了几个jar包用于后续使用 [root@centos01 ~]# ll /opt/apps/flink-1.16.0/lib/ # 手动添加的jar包,为了使用flink-sql客户端、整合hive、读取kafka数据、以及解决冲突等 -rw-r--r--. 1 root root 53820 Feb 14 14:34 commons-cli-1.4.jar -rw-r--r--. 1 root root 3534428 Feb 18 11:25 kafka-clients-2.6.2.jar -rw-r--r--. 1 root root 396461 Feb 18 11:07 flink-connector-kafka-1.16.0.jar -rw-r--r--. 1 root root 59604787 Feb 14 14:34 flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar -rw-r--r--. 1 root root 8645789 Feb 14 14:58 flink-connector-hive_2.12-1.16.0.jar -rw-r--r--. 1 root root 50975967 Feb 14 15:49 flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar -rw-r--r--. 1 root root 1654887 Feb 14 14:46 hadoop-mapreduce-client-core-3.1.1.jar # paimon相关jar包 -rw-r--r--. 1 root root 48288345 Feb 14 14:40 paimon-flink-1.16-1.0.0.jar -rw-r--r--. 1 root root 11469 Feb 17 11:33 paimon-flink-action-1.0.0.jar # flink原始jar包 -rw-r--r--. 1 502 games 198855 Oct 20 2022 flink-cep-1.16.0.jar -rw-r--r--. 1 502 games 515825 Oct 20 2022 flink-connector-files-1.16.0.jar -rw-r--r--. 1 502 games 102470 Oct 20 2022 flink-csv-1.16.0.jar -rw-r--r--. 1 502 games 117102633 Oct 20 2022 flink-dist-1.16.0.jar -rw-r--r--. 1 502 games 180250 Oct 20 2022 flink-json-1.16.0.jar -rw-r--r--. 1 502 games 21052633 Oct 20 2022 flink-scala_2.12-1.16.0.jar -rw-r--r--. 1 502 games 10737871 Sep 20 2022 flink-shaded-zookeeper-3.5.9.jar -rw-r--r--. 1 502 games 15367434 Oct 20 2022 flink-table-api-java-uber-1.16.0.jar -rw-r--r--. 1 502 games 36237974 Oct 20 2022 flink-table-planner-loader-1.16.0.jar -rw-r--r--. 1 502 games 3133682 Oct 20 2022 flink-table-runtime-1.16.0.jar -rw-r--r--. 1 502 games 208006 Sep 20 2022 log4j-1.2-api-2.17.1.jar -rw-r--r--. 1 502 games 301872 Sep 20 2022 log4j-api-2.17.1.jar -rw-r--r--. 1 502 games 1790452 Sep 20 2022 log4j-core-2.17.1.jar -rw-r--r--. 1 502 games 24279 Sep 20 2022 log4j-slf4j-impl-2.17.1.jar # 在hive中也能查询已有的paimon表 # 下载地址 repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-3.1/1.0.0/ # 在Hive中,auxlib目录是一个特殊目录,用于存放Hive自动加载的第三方JAR包 [root@centos01 hive-3.1.2]# mkdir ./auxlib [root@centos01 hive-3.1.2]# cp /opt/apps/paimon-hive-connector-3.1-1.0.0.jar ./auxlib/ # 启动hadoop和yarn [root@centos01 ~]# start-all.sh # 启动Hive [root@centos01 ~]# nohup hive --service metastore 2>&1 & [root@centos01 ~]# nohup hive --service hiveserver2 2>&1 & # yarn-session模式 # 中http://centos01:8088/cluster可以看到Flink session cluster的job ID [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/yarn-session.sh -d # 启动Flink的sql-client [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/sql-client.sh -s yarn-session # paimon支持hdfs、Hive以及JDBC的catalog # 我们这里使用hive元数据(注意:下面的sql语句每次进入flink-sql客户端都要执行) Flink SQL> CREATE CATALOG paimon_hive_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://centos01:9083' ); Flink SQL> USE CATALOG paimon_hive_catalog; Flink SQL> create database if not exists paimon_db; Flink SQL> use paimon_db; # 设置显示模式 Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; # 设置流模式 Flink SQL> SET 'execution.runtime-mode' = 'streaming'; 1.2 文件布局简介 如下图所示,一张表的所有文件都存储在一个基本目录下,Paimon 文件以分层方式组织。核心概念参考官网: paimon.apache.org/docs/1.0/concepts/overview/

上图说明了 Paimon 的文件布局, 从snapshot文件开始,Paimon reader可以递归地访问表中的所有记录。

Snapshot Files:所有snapshot文件都存储在snapshot目录中。snapshot文件是一个 JSON 文件,包含有关此snapshot的信息,包括:

正在使用的Schema文件包含此snapshot的所有更改的清单列表(manifest list)

Manifest Files:

所有清单(manifest)列表和清单文件都存储在清单目录中。清单列表(manifest list)是清单文件名的列表。清单文件是包含有关 LSM 数据文件和changelog文件的更改的文件。 例如对应快照中创建了哪个LSM数据文件、删除了哪个文件。

Data Files:

数据文件按分区和桶(Bucket)分组。每个Bucket目录都包含一个 LSM 树及其changelog文件。目前,Paimon 支持使用 orc(默认)、parquet 和 avro 作为数据文件格式。

Paimon 采用 LSM 树(日志结构合并树)作为文件存储的数据结构。

LSM 树将文件组织成多个 sorted runs(如下图所示)。 sorted runs由一个或多个数据文件组成,并且每个数据文件恰好属于一个 sorted runs。

数据文件中的记录按其主键排序。 在 sorted runs中,数据文件的主键范围永远不会重叠。

如图所示,不同的 sorted runs可能具有重叠的主键范围,甚至可能包含相同的主键。查询LSM树时,必须合并所有 sorted runs,并且必须根据用户指定的合并引擎和每条记录的时间戳来合并具有相同主键的所有记录。

写入LSM树的新记录将首先缓存在内存中。当内存缓冲区满时,内存中的所有记录将被顺序并刷新到磁盘,并创建一个新的 sorted runs。

Compaction

当越来越多的记录写入LSM树时,sorted runs的数量将会增加。由于查询LSM树需要将所有 sorted runs合并起来,太多 sorted runs将导致查询性能较差,甚至内存不足。为了限制 sorted runs的数量,我们必须偶尔将多个 sorted runs合并为一个大的 sorted runs。 这个过程称为压缩。然而,压缩是一个资源密集型过程,会消耗一定的CPU时间和磁盘IO,因此过于频繁的压缩可能会导致写入速度变慢。 这是查询和写入性能之间的权衡。 Paimon 目前采用了类似于 Rocksdb 通用压缩的压缩策略。默认情况下,当Paimon将记录追加到LSM树时,它也会根据需要执行压缩。 用户还可以选择在专用压缩作业中执行所有压缩。 2 主键表引擎详解 当Paimon sink收到两条或更多具有相同主键的记录时,它会将它们合并为一条记录以保持主键唯一。通过指定paimon表的merge-engine属性,用户可以选择如何将记录合并在一起。 2.1 默认的合并引擎deduplicate deduplicate合并引擎是默认的合并引擎。设置deduplicate合并引擎,paimon表只会保留最新的记录,并丢弃其他具有相同主键的记录。如果最新的记录是DELETE记录,则所有具有相同主键的记录都将被删除。 Flink SQL> drop table if exists orders_dedup; -- orders_dedup文件目录:hdfs://centos01:8020/user/hive/warehouse/paimon_db.db/orders_dedup Flink SQL> CREATE TABLE orders_dedup ( order_id INT PRIMARY KEY NOT ENFORCED, customer_id INT, product_id INT, quantity INT, price DECIMAL(10, 2), update_time TIMESTAMP(3) ) WITH ( 'connector' = 'paimon', 'merge-engine' = 'deduplicate' ); -- 插入数据(有兴趣的可以看hdfs上目录的变化) Flink SQL> INSERT INTO orders_dedup VALUES (1, 101, 201, 2, 99.99, CURRENT_TIMESTAMP), (2, 102, 202, 1, 49.99, CURRENT_TIMESTAMP); -- 我们可以批读 Flink SQL> SET 'execution.runtime-mode' = 'batch'; Flink SQL> select * from orders_dedup; +----------+-------------+------------+----------+-------+-------------------------+ | order_id | customer_id | product_id | quantity | price | update_time | +----------+-------------+------------+----------+-------+-------------------------+ | 1 | 101 | 201 | 2 | 99.99 | 2025-02-18 21:47:20.832 | | 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 | +----------+-------------+------------+----------+-------+-------------------------+ 2 rows in set -- 我们也可以流读 Flink SQL> SET 'execution.runtime-mode' = 'streaming'; Flink SQL> select * from orders_dedup; +----+-------------+-------------+-------------+-------------+--------------+-------------------------+ | op | order_id | customer_id | product_id | quantity | price | update_time | +----+-------------+-------------+-------------+-------------+--------------+-------------------------+ | +I | 1 | 101 | 201 | 2 | 99.99 | 2025-02-18 21:47:20.832 | | +I | 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 | -- 我们在另一个flink-sql窗口,插入重复数据 INSERT INTO orders_dedup VALUES (1, 101, 201, 3, 99.99, CURRENT_TIMESTAMP); -- 更新数量 -- 在刚才orders_dedup流读窗口,可以发现出现了-U和+U +----+-------------+-------------+-------------+-------------+--------------+-------------------------+ | op | order_id | customer_id | product_id | quantity | price | update_time | +----+-------------+-------------+-------------+-------------+--------------+-------------------------+ | +I | 1 | 101 | 201 | 2 | 99.99 | 2025-02-18 21:47:20.832 | | +I | 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 | | -U | 1 | 101 | 201 | 2 | 99.99 | 2025-02-18 21:47:20.832 | | +U | 1 | 101 | 201 | 3 | 99.99 | 2025-02-18 21:54:34.921 | -- 此时我们进行批量读取 Flink SQL> SET 'execution.runtime-mode' = 'batch'; Flink SQL> select * from orders_dedup; -- 可以发现,order_id为1行发生了变化 +----------+-------------+------------+----------+-------+-------------------------+ | order_id | customer_id | product_id | quantity | price | update_time | +----------+-------------+------------+----------+-------+-------------------------+ | 1 | 101 | 201 | 3 | 99.99 | 2025-02-18 21:54:34.921 | | 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 | +----------+-------------+------------+----------+-------+-------------------------+ 2 rows in set -- 注意:在hive中也能读取此表(原始数据只有一份) -- 其实,还能够在hive中建立paimon表,并插入数据,这里不再演示 0: jdbc:hive2://192.168.42.101:10000> use paimon_db; No rows affected (0.718 seconds) 0: jdbc:hive2://192.168.42.101:10000> select * from orders_dedup; +------------------------+---------------------------+--------------------------+------------------------+---------------------+---------------------------+ | orders_dedup.order_id | orders_dedup.customer_id | orders_dedup.product_id | orders_dedup.quantity | orders_dedup.price | orders_dedup.update_time | +------------------------+---------------------------+--------------------------+------------------------+---------------------+---------------------------+ | 1 | 101 | 201 | 3 | 99.99 | 2025-02-18 21:54:34.921 | | 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 | 2.2 partial-update

通过指定 'merge-engine' = 'partial-update',用户可以通过多次更新来更新记录的列,直到记录完成。

这是通过使用同一主键下的最新数据逐一更新值字段来实现的,注意:在此过程中不会覆盖空值。

如下所示:

<1, 23.0, 10, NULL>-<1, NULL, NULL, ‘This is a book’><1, 25.2, NULL, NULL>

假设第一列是主键key,那么最后的结果是 <1, 25.2, 10, ‘This is a book’>

Flink SQL> SET 'execution.runtime-mode' = 'batch'; Flink SQL> CREATE TABLE orders_partial_update ( order_id INT PRIMARY KEY NOT ENFORCED, customer_id INT, product_id INT, quantity INT, price DECIMAL(10, 2), update_time TIMESTAMP(3) ) WITH ( 'connector' = 'paimon', 'merge-engine' = 'partial-update', -- 指定部分更新 'partial-update.remove-record-on-delete' = 'true' ); -- 插入数据 Flink SQL> INSERT INTO orders_partial_update VALUES (1, 101, 201, 2, 99.99, CURRENT_TIMESTAMP) , (2, 102, 202, 1, 49.99, CURRENT_TIMESTAMP); Flink SQL> select * from orders_partial_update; +----------+-------------+------------+----------+-------+-------------------------+ | order_id | customer_id | product_id | quantity | price | update_time | +----------+-------------+------------+----------+-------+-------------------------+ | 1 | 101 | 201 | 2 | 99.99 | 2025-02-14 17:22:57.942 | | 2 | 102 | 202 | 1 | 49.99 | 2025-02-14 17:22:57.942 | +----------+-------------+------------+----------+-------+-------------------------+ -- 部分更新 Flink SQL> INSERT INTO orders_partial_update(order_id, customer_id, product_id, quantity, price, update_time) VALUES (1, cast(NULL as INT), cast(NULL as INT), 3, cast(NULL as DECIMAL(10, 2)), CURRENT_TIMESTAMP); Flink SQL> select * from orders_partial_update; -- 可以看到quantity和update_time发生了变化,其他字段不变 +----------+-------------+------------+----------+-------+-------------------------+ | order_id | customer_id | product_id | quantity | price | update_time | +----------+-------------+------------+----------+-------+-------------------------+ | 1 | 101 | 201 | 3 | 99.99 | 2025-02-14 17:27:49.498 | | 2 | 102 | 202 | 1 | 49.99 | 2025-02-14 17:22:57.942 | +----------+-------------+------------+----------+-------+-------------------------+ -- 时光旅行 Flink SQL> SELECT * FROM orders_partial_update /*+ OPTIONS('scan.snapshot-id' = '1') */; +----------+-------------+------------+----------+-------+-------------------------+ | order_id | customer_id | product_id | quantity | price | update_time | +----------+-------------+------------+----------+-------+-------------------------+ | 1 | 101 | 201 | 2 | 99.99 | 2025-02-14 17:22:57.942 | | 2 | 102 | 202 | 1 | 49.99 | 2025-02-14 17:22:57.942 | +----------+-------------+------------+----------+-------+-------------------------+ 2 rows in set Flink SQL> SELECT * FROM orders_partial_update /*+ OPTIONS('scan.snapshot-id' = '2') */; +----------+-------------+------------+----------+-------+-------------------------+ | order_id | customer_id | product_id | quantity | price | update_time | +----------+-------------+------------+----------+-------+-------------------------+ | 1 | 101 | 201 | 3 | 99.99 | 2025-02-14 17:27:49.498 | | 2 | 102 | 202 | 1 | 49.99 | 2025-02-14 17:22:57.942 | +----------+-------------+------------+----------+-------+-------------------------+

那么,"partial-update"有何作用呢?

2.2.1 利用"partial-update"实现双流join 传统方式:Flink 双流 Join 需要将两个流的所有数据都存储在 Flink 状态中,以便进行匹配。随着数据量的增长,状态会变得非常庞大,可能导致内存和存储压力。Paimon 方式:Paimon 表作为外部存储,将 Join 结果直接写入表中,Flink 不需要维护庞大的状态。这显著减少了 Flink 的状态管理压力。 -- 创建paimon结果表 drop table if exists order_payments; CREATE TABLE order_payments ( order_id STRING, order_time TIMESTAMP comment '订单时间', customer_id STRING, total_amount DECIMAL(10, 2), payment_status STRING, payment_time TIMESTAMP comment '支付时间', PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'merge-engine' = 'partial-update', 'changelog-producer' = 'lookup', 'partial-update.remove-record-on-delete' = 'true' ); -- 订单信息表(临时表) CREATE TEMPORARY TABLE orders ( order_id STRING, order_time TIMESTAMP, customer_id STRING, total_amount DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'properties.group.id' = 'testordersGroup', 'json.fail-on-missing-field' = 'false', 'scan.startup.mode' = 'earliest-offset', 'json.ignore-parse-errors' = 'true' ); -- 创建topic(临时表) /opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 -- 启动命令行生产者,生产数据 /opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders --bootstrap-server centos01:9092 {"order_id": "order_1", "order_time": "2023-10-01 10:00:00", "customer_id": "customer_1", "total_amount": 100.00} {"order_id": "order_2", "order_time": "2023-10-01 11:00:00", "customer_id": "customer_2", "total_amount": 200.00} -- 支付信息表 CREATE TEMPORARY TABLE payments ( payment_id STRING, order_id STRING, payment_time TIMESTAMP, amount DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'payments', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'properties.group.id' = 'testpaymentsGroup', 'json.fail-on-missing-field' = 'false', 'scan.startup.mode' = 'earliest-offset', 'json.ignore-parse-errors' = 'true' ); -- 创建topic /opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic payments --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 Flink SQL> SET 'table.exec.sink.upsert-materialize' = 'NONE'; -- 插入到paimon表中 Flink SQL> insert into order_payments select coalesce(o.order_id, p.order_id) as order_id, o.order_time, o.customer_id, o.total_amount, case when p.payment_id is not null then 'paid' else 'unpaid' end as payment_status, p.payment_time from orders o left join payments p on o.order_id = p.order_id; Flink SQL> SET 'execution.runtime-mode' = 'streaming'; Flink SQL> select * from order_payments; +----+--------------------------------+----------------------------+--------------------------------+--------------+--------------------------------+----------------------------+ | op | order_id | order_time | customer_id | total_amount | payment_status | payment_time | +----+--------------------------------+----------------------------+--------------------------------+--------------+--------------------------------+----------------------------+ | +I | order_1 | 2023-10-01 10:00:00.000000 | customer_1 | 100.00 | unpaid | <NULL> | | +I | order_2 | 2023-10-01 11:00:00.000000 | customer_2 | 200.00 | unpaid | <NULL> | -- 然后,启动命令行生产者,生产数据 /opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic payments --bootstrap-server centos01:9092 {"payment_id": "payment_1", "order_id": "order_1", "payment_time": "2023-10-01 10:30:00", "amount": 100.00} {"payment_id": "payment_2", "order_id": "order_2", "payment_time": "2023-10-01 11:30:00", "amount": 200.00} -- 可以发现当支付信息相关字段发生了更新 +----+--------------------------------+----------------------------+--------------------------------+--------------+--------------------------------+----------------------------+ | op | order_id | order_time | customer_id | total_amount | payment_status | payment_time | +----+--------------------------------+----------------------------+--------------------------------+--------------+--------------------------------+----------------------------+ | -U | order_1 | 2023-10-01 10:00:00.000000 | customer_1 | 100.00 | unpaid | <NULL> | | +U | order_1 | 2023-10-01 10:00:00.000000 | customer_1 | 100.00 | paid | 2023-10-01 10:30:00.000000 | | -U | order_2 | 2023-10-01 11:00:00.000000 | customer_2 | 200.00 | unpaid | <NULL> | | +U | order_2 | 2023-10-01 11:00:00.000000 | customer_2 | 200.00 | paid | 2023-10-01 11:30:00.000000 | 2.2.2 Sequence Field(序列字段)的使用 在分布式计算中,会出现导致数据无序的情况,这可能会导致错误具体可以参考如下案例: drop table if exists my_table_demo_1; CREATE TABLE my_table_demo_1 ( pk BIGINT PRIMARY KEY NOT ENFORCED, v1 DOUBLE, v2 BIGINT, update_time TIMESTAMP(3) ) WITH ( 'connector' = 'paimon' ); -- 在分布式计算中,会出现导致数据无序的情况,如下所示: INSERT INTO my_table_demo_1 VALUES (1, 100.0, 10, TIMESTAMP '2025-02-18 15:01:00'), (1, 200.0, 20, TIMESTAMP '2025-02-18 15:02:00'), (1, 200.0, 20, TIMESTAMP '2025-02-18 15:00:00'), -- 乱序数据 (2, 300.0, 30, TIMESTAMP '2025-02-18 15:30:00'); -- 可以看到最后保留的是'2025-02-18 15:00:00'的数据 Flink SQL> SELECT * FROM my_table_demo_1; +----+---------------+---------------+----------------+-------------------------+ | op | pk | v1 | v2 | update_time | +----+---------------+---------------+----------------+-------------------------+ | +I | 1 | 200.0 | 20 | 2025-02-18 15:00:00.000 | | +I | 2 | 300.0 | 30 | 2025-02-18 15:30:00.000 | 利用Sequence Field(序列字段)可以解决上述问题,会根据设定的序列字段保留最终的数据。如果序列字段的值相同,则根据输入顺序决定哪一个最后合并。可以为sequence.field定义多个字段,例如 ‘update_time,flag’,在这种情况下,会按照顺序比较多个字段。用户自定义的序列字段与诸如first_row和first_value等特性冲突,这可能导致意外的结果。 drop table if exists my_table_demo_2; CREATE TABLE my_table_demo_2 ( pk BIGINT PRIMARY KEY NOT ENFORCED, v1 DOUBLE, v2 BIGINT, update_time TIMESTAMP(3) ) WITH ( 'connector' = 'paimon', 'sequence.field' = 'update_time' -- 设置sequence.field为update_time ); -- 乱序数据 INSERT INTO my_table_demo_2 VALUES (1, 100.0, 10, TIMESTAMP '2025-02-18 15:01:00'), (1, 200.0, 20, TIMESTAMP '2025-02-18 15:02:00'), -- 这条记录将因为update_time较晚而被最后合并 (1, 200.0, 20, TIMESTAMP '2025-02-18 15:00:00'), (2, 300.0, 30, TIMESTAMP '2025-02-18 15:30:00'); Flink SQL> SELECT * FROM my_table_demo_2; +----+------------+---------------+------------+-------------------------+ | op | pk | v1 | v2 | update_time | +----+------------+---------------+------------+-------------------------+ | +I | 1 | 200.0 | 20 | 2025-02-18 15:02:00.000 | | +I | 2 | 300.0 | 30 | 2025-02-18 15:30:00.000 | 2.2.3 利用Sequence Group解决多流更新出现的混乱 指定Sequence Field并不能解决多流更新的部分更新表的乱序问题,因为多流更新时 Sequence(序列) 字段可能会被另一个流的最新数据覆盖。因此paimon引入了部分更新表的序列组(Sequence Group)机制: 每个流定义其自己的序列组,用来解决多流更新时出现混乱问题;真正的部分更新,而不仅仅是非空更新。 drop table if exists T; CREATE TABLE T ( k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d' ); INSERT INTO T VALUES (1, 1, 1, 1, 1, 1, 1); Flink SQL> select * from T; +---+---+---+-----+---+---+-----+ | k | a | b | g_1 | c | d | g_2 | +---+---+---+-----+---+---+-----+ | 1 | 1 | 1 | 1 | 1 | 1 | 1 | +---+---+---+-----+---+---+-----+ 1 row in set -- g_2为null, 因此c, d字段不会被更新 INSERT INTO T VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT)); Flink SQL> select * from T; +---+---+---+-----+---+---+-----+ | k | a | b | g_1 | c | d | g_2 | +---+---+---+-----+---+---+-----+ | 1 | 2 | 2 | 2 | 1 | 1 | 1 | +---+---+---+-----+---+---+-----+ 1 row in set -- g_1 is smaller, 因此a, b字段不会被更新 -- g_2不为空,因此c, d字段会被更新 INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3); Flink SQL> select * from T; +---+---+---+-----+---+---+-----+ | k | a | b | g_1 | c | d | g_2 | +---+---+---+-----+---+---+-----+ | 1 | 2 | 2 | 2 | 3 | 3 | 3 | +---+---+---+-----+---+---+-----+ 2.3 aggregation 有时用户只关心聚合结果,"aggregation"引擎根据聚合函数将同一主键下的各个值字段与最新数据一一聚合。每个不属于主键的字段都可以被赋予一个聚合函数,由 fields.<field-name>.aggregate-function 表属性指定,否则它将使用 last_non_null_value 聚合作为默认值。当前支持的聚合函数和数据类型有: sum:支持 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT 和 DOUBLE。min/max:支持 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。last_value / last_non_null_value:支持所有数据类型。listagg:支持STRING数据类型。bool_and / bool_or:支持BOOLEAN数据类型。first_value/first_not_null_value:支持所有数据类型。 只有 sum 支持撤回(UPDATE_BEFORE 和 DELETE),其他聚合函数不支持撤回。 如果允许某些函数忽略撤回消息,可以配置:'fields.${field_name}.ignore-retract'='true' drop table if exists my_agg; CREATE TABLE my_agg ( product_id BIGINT, price DOUBLE, sales BIGINT, PRIMARY KEY (product_id) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 'changelog-producer' = 'lookup', 'fields.price.aggregate-function' = 'max', -- price字段将通过max函数聚合 'fields.sales.aggregate-function' = 'sum' -- sales字段将通过sum函数聚合 ); insert into my_agg values(1, 23.0, 15); Flink SQL> SET 'execution.runtime-mode' = 'streaming'; Flink SQL> select * from my_agg; +----+----------------------+--------------------------------+----------------------+ | op | product_id | price | sales | +----+----------------------+--------------------------------+----------------------+ | +I | 1 | 23.0 | 15 | -- 在另一个窗口,再次插入数据 Flink SQL> insert into my_agg values(1, 30.2, 20); -- 之前的流读窗口 +----+----------------------+--------------------------------+----------------------+ | op | product_id | price | sales | +----+----------------------+--------------------------------+----------------------+ | +I | 1 | 23.0 | 15 | | -U | 1 | 23.0 | 15 | | +U | 1 | 30.2 | 35 | 2.4 first-row 通过指定 'merge-engine' = 'first-row',用户可以保留同一主键的第一行。它与Deduplicate合并引擎不同,在First Row合并引擎中,它将生成仅insert changelog。 CREATE TABLE user_actions ( user_id INT, action_time TIMESTAMP, action STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'merge-engine' = 'first-row', 'changelog-producer' = 'lookup', -- 必须与lookup changelog producer一起使用 'ignore-delete' = 'true' -- 忽略DELETE和UPDATE_BEFORE消息 ); -- 插入数据到 user_actions 表 INSERT INTO user_actions (user_id, action_time, action) VALUES (1, TIMESTAMP '2023-10-01 10:00:00', 'login'), (1, TIMESTAMP '2023-10-01 10:05:00', 'logout'), (2, TIMESTAMP '2023-10-01 10:10:00', 'login'), (1, TIMESTAMP '2023-10-01 10:15:00', 'login'), (2, TIMESTAMP '2023-10-01 10:20:00', 'logout'); Flink SQL> select * from user_actions; +----+-------------+----------------------------+--------------------------------+ | op | user_id | action_time | action | +----+-------------+----------------------------+--------------------------------+ | +I | 1 | 2023-10-01 10:00:00.000000 | login | | +I | 2 | 2023-10-01 10:10:00.000000 | login | 3 changelog-producer简介

官网介绍:Changelog Producer | Apache Paimon

源码级讲解: xie.infoq /article/f08b0912ef6565dd1cc78e988

flink中checkpoint和snapshot的联系

批写(手动执行sql脚本)每一个sql会立即生成一次checkpoint效果流式写入根据checkpoint间隔,定期进行checkpoint一次checkpoint会产生1-2个snapshot文件,具体是要看这次checkpoint是否触发compaction: 触发了就是2个data文件(一个是合并后的数据,一个本次checkpoint写入数据,默认产生5个snapshot就会触发compaction)否则只有一个(本次checkpoint写入数据)一次snapshot会产生一个data文件 3.1 none

'changelog-producer' = 'none'的情况下,一次操作产生一个data文件的,不会产生changelog文件;'changelog-producer' = 'none'的情况下,流式读取结果是正确的(含有-U、+U的op),虽然不像input模式有changelog文件产生,但是paimon会记录每次操作产生的快照,根据不同版本的快照数据,经过汇总能够推断出changelog; -- 'changelog-producer' = 'none'的应用场景: -- 用于只进行批处理,不进行流式处理的表 CREATE TABLE paimon_none_mode ( a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'changelog-producer' = 'none' ); -- 间隔一段时间插入 INSERT INTO paimon_none_mode VALUES (1, 1, '1'); INSERT INTO paimon_none_mode VALUES (1, 1, '2'); INSERT INTO paimon_none_mode VALUES (1, 1, '3'); INSERT INTO paimon_none_mode VALUES (1, 1, '4'); INSERT INTO paimon_none_mode VALUES (1, 1, '5'); -- 此时会有两个snapshot,即:snapshot-5和snapshot-6 INSERT INTO paimon_none_mode VALUES (1, 1, '6'); INSERT INTO paimon_none_mode VALUES (1, 1, '7'); INSERT INTO paimon_none_mode VALUES (1, 1, '8'); INSERT INTO paimon_none_mode VALUES (1, 1, '9'); -- 此时会有两个snapshot,即:snapshot-10和snapshot-11 INSERT INTO paimon_none_mode VALUES (1, 1, '10'); -- paimon会记录每次操作产生的快照,根据不同版本的快照数据,经过汇总能够推断出changelog -- hdfs目录中无changelog文件 Flink SQL> select * from paimon_none_mode /*+ OPTIONS('scan.snapshot-id' = '1') */; +----+-------------+-------------+--------------+ | op | a | b | c | +----+-------------+-------------+--------------+ | +I | 1 | 1 | 1 | | -U | 1 | 1 | 1 | | +U | 1 | 1 | 2 | | -U | 1 | 1 | 2 | | +U | 1 | 1 | 3 | | -U | 1 | 1 | 3 | | +U | 1 | 1 | 4 | | -U | 1 | 1 | 4 | | +U | 1 | 1 | 5 | | -U | 1 | 1 | 5 | | +U | 1 | 1 | 6 | | -U | 1 | 1 | 6 | | +U | 1 | 1 | 7 | | -U | 1 | 1 | 7 | | +U | 1 | 1 | 8 | | -U | 1 | 1 | 8 | | +U | 1 | 1 | 9 | | -U | 1 | 1 | 9 | | +U | 1 | 1 | 10 | 不过当快照被删除之后,none模式就无法自动推算changelog了。因此这个none模式的changelog实际上是不够稳定的,而且比较耗费运算资源;由于批处理模式不会使用changelog,因此批处理模式和none模式是比较搭配的。 drop table paimon_none_mode_expire; CREATE TABLE paimon_none_mode_expire ( a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'changelog-producer' = 'none', 'snapshot.num-retained.min' = '10', 'snapshot.num-retained.max' = '10' -- 设置最多保存10个快照 ); -- 间隔一段时间插入 INSERT INTO paimon_none_mode_expire VALUES (1, 1, '1'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '2'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '3'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '4'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '5'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '6'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '7'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '8'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '9'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '10'); INSERT INTO paimon_none_mode_expire VALUES (1, 1, '11'); -- 可以发现之前快照已经删除 [root@centos01 ~]# hdfs dfs -ls /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/* -rw-r--r-- 1 root supergroup 1 2025-02-18 18:27 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/EARLIEST -rw-r--r-- 1 root supergroup 2 2025-02-18 18:27 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/LATEST -rw-r--r-- 1 root supergroup 622 2025-02-18 18:26 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-10 -rw-r--r-- 1 root supergroup 624 2025-02-18 18:26 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-11 -rw-r--r-- 1 root supergroup 622 2025-02-18 18:27 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-12 -rw-r--r-- 1 root supergroup 622 2025-02-18 18:27 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-13 -rw-r--r-- 1 root supergroup 621 2025-02-18 18:24 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-4 -rw-r--r-- 1 root supergroup 621 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-5 -rw-r--r-- 1 root supergroup 623 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-6 -rw-r--r-- 1 root supergroup 621 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-7 -rw-r--r-- 1 root supergroup 621 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-8 -rw-r--r-- 1 root supergroup 621 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-9 Flink SQL> select * from paimon_none_mode_expire /*+ OPTIONS('scan.snapshot-id' = '1')*/; +----+-------------+-------------+----------------+ | op | a | b | c | +----+-------------+-------------+----------------+ | +I | 1 | 1 | 4 | | -U | 1 | 1 | 4 | | +U | 1 | 1 | 5 | | -U | 1 | 1 | 5 | | +U | 1 | 1 | 6 | | -U | 1 | 1 | 6 | | +U | 1 | 1 | 8 | | -U | 1 | 1 | 8 | | +U | 1 | 1 | 7 | | -U | 1 | 1 | 7 | | +U | 1 | 1 | 9 | | -U | 1 | 1 | 9 | | +U | 1 | 1 | 10 | | -U | 1 | 1 | 10 | | +U | 1 | 1 | 11 | 3.2 input

'changelog-producer' = 'input'的情况下,一次checkpoint产生一个data文件的同时,也会产生一个changelog文件,其中changelog文件内容和data文件内容完全一致;'changelog-producer' = 'input'的情况下,如果你的操作不完整,那么流式读取的结果也是不对的。 例如insert2次相同主键,按照主键表的逻辑,应该是会出现-D +I 或者 -U +U 的场景,但是由于input模式,不会额外的处理changelog,insert两次,changelog就写两次insert要想对,那就需要先insert,然后delete,再insert,或者执行update操作 'changelog-producer' = 'input'就是为流处理设计的,但是数据处理操作必须要标准,即必须要有-U +U -D +I的操作,而CDC任务接到的数据就是标准的,因此一般用于cdc采集且后期要进行流式处理的表 -- 应用场景: -- 一般用于cdc采集且后期要进行流式处理的表 CREATE TABLE paimon_input_mode ( a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'changelog-producer' = 'input' ); -- 插入重复的主键数据 -- 第一次插入 INSERT INTO paimon_input_mode VALUES (1, 1, '1'); -- 延迟一段时间后,在另一个窗口第二次插入 INSERT INTO paimon_input_mode VALUES (1, 1, '2'); Flink SQL> SET 'execution.runtime-mode' = 'streaming'; [INFO] Session property has been set. -- 可以发现没有-U和+U Flink SQL> select * from paimon_input_mode; +----+-------------+-------------+--------------+ | op | a | b | c | +----+-------------+-------------+--------------+ | +I | 1 | 1 | 1 | | +I | 1 | 1 | 2 | 3.3 lookup

'changelog-producer' = 'lookup'会查询变化前的数据,并对比变化后数据,自动生成一份chagelog。相比input模式而言,lookup会自己产生正确的changelog,而不管输入数据是否符合规范;

lookup模式针对于不是cdc采集的表,而且要用于流式处理的表;

Lookup 会将数据缓存在内存和本地磁盘上,可以使用以下选项来调整性能:

OptionDefaultTypeDescriptionlookup.cache-file-retention1 hDurationThe cached files retention time for lookup. After the file expires, if there is a need for access, it will be re-read from the DFS to build an index on the local disk.lookup.cache-max-disk-sizeunlimitedMemorySizeMax disk size for lookup cache, you can use this option to limit the use of local disks.lookup.cache-max-memory-size256 mbMemorySizeMax memory size for lookup cache.

应用场景:

非cdc采集的表且更在乎数据延迟使用了聚合引擎或者写入过程有计算逻辑流处理表使用了first-row合并引擎 3.4 full-compaction

'changelog-producer' = 'full-compaction'模式相比于lookup模式区别:

lookup模式:一次checkpoint产生一次changelog,数据延迟小,资源消耗大full-compaction模式:n次checkpoint产生一次changelog,数据延迟大,资源消耗小

通过指定 full-compaction.delta-commits 表属性,在增量提交(检查点 checkpoint)后将不断触发 full compaction。 默认情况下设置为 1,因此每个检查点都会进行完全压缩并生成change log。

应用场景:

非cdc采集的表且不在乎数据延迟的场景

标签:

大数据组件(四)快速入门实时数据湖存储系统ApachePaimon(1)由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“大数据组件(四)快速入门实时数据湖存储系统ApachePaimon(1)