SpringBoot集成Kettle
- IT业界
- 2025-09-04 13:45:01

Kettle 简介
Kettle 最初由 Matt Casters 开发,是 Pentaho 数据集成平台的一部分。它提供了一个用户友好的界面和丰富的功能集,使用户能够轻松地设计、执行和监控 ETL 任务。Kettle 通过其强大的功能和灵活性,帮助企业高效地处理大规模数据集成任务。
主要组成部分 Spoon: 用途:Spoon 是 Kettle 的图形化设计工具。用户可以使用 Spoon 设计和调试 ETL 转换和作业。功能:拖放式界面、预览数据、测试 ETL 流程、管理连接、编写脚本等。 Pan: 用途:Pan 是一个命令行工具,用于执行由 Spoon 设计的 ETL 转换。功能:通过命令行执行转换、调度作业、集成到其他自动化流程中。 Kitchen: 用途:Kitchen 是一个命令行工具,用于执行由 Spoon 设计的 ETL 作业。功能:通过命令行执行作业、调度作业、集成到其他自动化流程中。 Carte: 用途:Carte 是一个轻量级的 Web 服务器,提供远程执行和监控功能。功能:远程执行和监控 ETL 转换和作业、查看日志、管理集群等。 Repositories: 用途:存储和管理 ETL 转换和作业的地方。功能:可以使用数据库或文件系统作为存储库,支持版本控制和共享。 主要功能和特点数据提取:
支持多种数据源,如关系数据库、文件(CSV、Excel、XML 等)、大数据平台(Hadoop、Hive 等)、云存储(Amazon S3、Google Drive 等)、Web 服务和 API 等。数据转换:
丰富的转换步骤,包括数据清洗、数据聚合、数据过滤、数据排序、数据连接、数据拆分、数据类型转换等。数据加载:
支持将数据加载到多种目标系统中,如关系数据库、大数据平台、文件系统、云存储等。调度和自动化:
支持通过命令行工具(Pan 和 Kitchen)和调度器(如 cron 或 Windows 任务计划)进行调度和自动化执行。扩展性:
提供了插件机制,用户可以编写自定义插件,扩展 Kettle 的功能。支持 JavaScript 和 Java 进行脚本编写,增强转换和作业的灵活性。集群和并行处理:
支持集群模式,能够在分布式环境中并行处理大规模数据。提供了分布式 ETL 执行和负载均衡功能。数据质量和数据治理:
提供了数据验证、数据一致性检查和数据校验功能,帮助确保数据的质量和一致性。实时数据处理:
支持实时数据流处理,通过集成 Kafka、MQTT 等流处理平台,实现实时数据的提取、转换和加载。 集成 Kettle将 Kettle(Pentaho Data Integration, PDI)集成到 Spring Boot 项目中,可以实现 ETL 流程的自动化和集成化处理。以下是详细的集成过程:
准备工作 下载 Kettle:从 Pentaho 官网下载 Kettle(PDI)的最新版本,并解压到本地目录。Spring Boot 项目:确保已有一个 Spring Boot 项目,或新建一个 Spring Boot 项目。 引入 Kettle 依赖在 Spring Boot 项目的 pom.xml 文件中添加 Kettle 所需的依赖。你可以将 Kettle 的 JAR 文件添加到本地 Maven 仓库,或直接在项目中引入这些 JAR 文件。
<dependencies> <!-- Spring Boot 依赖 --> <!-- Kettle 依赖 --> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>9.4.0.0-343</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-engine</artifactId> <version>9.4.0.0-343</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-dbdialog</artifactId> <version>9.4.0.0-343</version> </dependency> <dependency> <groupId>org.apache mons</groupId> <artifactId>commons-vfs2</artifactId> <version>2.7.0</version> </dependency> <!-- 根据需要添加其他 Kettle 依赖 --> <!-- 操作数据库数据时添加相应的数据库依赖 --> </dependencies> 处理密码加密在 resources 目录下创建 kettle-password-encoder-plugins.xml 文件,用于配置密码加密插件:
<password-encoder-plugins> <password-encoder-plugin id="Kettle"> <description>Kettle Password Encoder</description> <classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname> </password-encoder-plugin> </password-encoder-plugins>kettle-core依赖中org.pentaho.support.encryption.KettleTwoWayPasswordEncoder类实现了TwoWayPasswordEncoderInterface接口,用于处理密码的加密和解密操作。
添加 Spoon 的任务文件在 Kettle(Pentaho Data Integration,PDI)中,作业(Job)和转换(Transformation)是两种核心的 ETL 组件,它们在设计和功能上有着本质的区别。
转换(Transformation) 数据处理流程:转换是一个数据处理流程,专注于数据的提取(Extract)、转换(Transform)和加载(Load)。行级处理:转换以行级处理数据,每次处理一行数据,并将其传递给下一步骤。任务文件为.ktr文件。 作业(Job) 任务管理和控制流程:作业是一个任务管理和控制流程,负责调度和控制一系列任务的执行顺序。步骤级处理:作业以步骤为单位处理任务,每次执行一个步骤,然后根据条件决定执行下一个步骤。任务文件为.kjb文件。 区别 转换处理数据行,作业处理任务步骤。转换中的步骤是并行执行的,而作业中的步骤是顺序执行的。转换侧重于数据的处理和转换,作业侧重于任务的调度和管理。转换主要通过数据流控制,作业提供了丰富的逻辑控制(条件判断、循环、错误处理等)。转换适用于复杂的数据处理流程,作业适用于任务调度和控制。在 Spring Boot 项目的 resources 目录下,创建一个 kettle 目录,并将 Kettle 的任务文件(如 转换1.ktr)复制到该目录中。
编写 Kettle 服务类创建一个服务类,用于执行 Kettle 转换或作业。
package com.example.kettletest.service.impl; import com.example.kettletest.service.KettleJobService; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.util.EnvUtil; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Service; import java.io.File; import java.io.IOException; /** * @author 罗森 * @date 2024/6/6 13:21 */ @Service public class KettleJobServiceImpl implements KettleJobService { @Override public void runTaskFile(String taskFileName) { // 初始化 Kettle 环境 try { KettleEnvironment.init(); EnvUtil.environmentInit(); } catch (KettleException e) { throw new RuntimeException(e); } // 执行任务文件 if (taskFileName.endsWith(".ktr")) { taskFileKTR(taskFileName); } else if (taskFileName.endsWith(".kjb")) { taskFileKJB(taskFileName); } else { throw new IllegalArgumentException("Unsupported file type: " + taskFileName); } } /** * 针对kjb文件的操作 * @param taskFileName */ public void taskFileKJB(String taskFileName) { try { // 获取资源文件路径 ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName); File jobFile = resource.getFile(); // 加载 KJB 文件 JobMeta jobMeta = new JobMeta(jobFile.getAbsolutePath(), null); // 创建作业对象 Job job = new Job(null, jobMeta); // 启动作业 job.start(); // 等待作业完成 job.waitUntilFinished(); if (job.getErrors() > 0) { System.out.println("There were errors during job execution."); } else { System.out.println("Job executed successfully."); } } catch (IOException | KettleXMLException e) { e.printStackTrace(); } } /** * 针对ktr文件的操作 * @param taskFileName */ public void taskFileKTR(String taskFileName) { try { // 获取资源文件路径 ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName); File transFile = resource.getFile(); // 加载 KTR 文件 TransMeta transMeta = new TransMeta(transFile.getAbsolutePath()); // 创建转换对象 Trans trans = new Trans(transMeta); // 启动作业 trans.execute(null); // 等待作业完成 trans.waitUntilFinished(); if (trans.getErrors() > 0) { System.err.println("There were errors during Transformation execution."); } else { System.out.println("Transformation executed successfully!"); } } catch (IOException | KettleException e) { e.printStackTrace(); } } } 常见问题解决办法运行后报错信息为:Unable to find plugin with ID 'Kettle'. If this is a test, make sure kettle-core tests jar is a dependency. If this is live make sure a kettle-password-encoder-plugins.xml exits in the classpath.
**解决办法:**在 resources 目录下创建 kettle-password-encoder-plugins.xml 文件。
运行后报错信息为:ERROR (version 9.4.0.0-343, build 0.0 from 2022-11-08 07.50.27 by buildguy) : A serious error occurred during job execution: 无法找到作业的开始点.
**解决办法:**为Spoon制作的作业任务增加开始节点。
运行后报错信息为:Can't run transformation due to plugin missing.
**解决办法:**此问题通常出现在涉及类似于导出excel文件、json文件时。在初始化 Kettle 环境之前指明相关插件的绝对路径(相关插件通常在Kettle本地解压文件夹中的plugins目录下),新增以下代码:
StepPluginType.getInstance().getPluginFolders().add(new PluginFolder("E:\Kettle\pdi-ce-9.4.0.0-343\data-integration\plugins", false, true));将代码中的地址换成您本地的绝对地址。
(END) by luosen.
SpringBoot集成Kettle由讯客互联IT业界栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“SpringBoot集成Kettle”