Flink导入StarRocks
- IT业界
- 2025-07-21 18:58:52

1、pom依赖 <properties> <maven piler.source>8</maven piler.source> <maven piler.target>8</maven piler.target> <flink.version>1.13.6</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- Apache Flink 的依赖, 这些依赖项,生产环境可以不打包到JAR文件中. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- flink-connector-starrocks --> <dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>1.2.5_flink-1.13_2.12</version> </dependency> </dependencies> 2、代码编写 public class LoadJsonRecords { public static void main(String[] args) throws Exception { // To run the example, you should prepare in the following steps // 1. create a primary key table in your StarRocks cluster. The DDL is // CREATE DATABASE `test`; // CREATE TABLE `test`.`score_board` // ( // `id` int(11) NOT NULL COMMENT "", // `name` varchar(65533) NULL DEFAULT "" COMMENT "", // `score` int(11) NOT NULL DEFAULT "0" COMMENT "" // ) // ENGINE=OLAP // PRIMARY KEY(`id`) // COMMENT "OLAP" // DISTRIBUTED BY HASH(`id`) // PROPERTIES( // "replication_num" = "1" // ); // // 2. replace the connector options "jdbc-url" and "load-url" with your cluster configurations MultipleParameterTool params = MultipleParameterTool.fromArgs(args); String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030"); String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040"); //String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030"); //String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040"); //String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030"); //String loadUrl = params.get("loadUrl", "be-ip:8040,be-ip:8040,be-ip:8040"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Generate json-format records. Each record has three fields correspond to // the columns `id`, `name`, and `score` in StarRocks table. String[] records = new String[]{ "{\"id\":1111, \"name\":\"starrocks-json\", \"score\":100}", "{\"id\":2222, \"name\":\"flink-json\", \"score\":100}", }; DataStream<String> source = env.fromElements(records); // Configure the connector with the required properties, and you also need to add properties // "sink.properties.format" and "sink.properties.strip_outer_array" to tell the connector the // input records are json-format. StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "tmp") .withProperty("table-name", "score_board") .withProperty("username", "") .withProperty("password", "") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .withProperty("sink.parallelism","1") //.withProperty("sink.version","V1") .build(); // Create the sink with the options SinkFunction<String> starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink); env.execute("LoadJsonRecords"); } }
Flink导入StarRocks由讯客互联IT业界栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Flink导入StarRocks”