Kafka消息服务之Java工具类
- 创业
- 2025-08-27 19:06:02

注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN;
Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。 Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样(将处理与数据生产者分离开来、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。
Java工具类
此基于kafka客户端的工具类,提供基础的消息发送与监听功能。
pom.xml
<!-- 集成kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.2</version> </dependency>KafkaUtils.java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka mon.serialization.StringDeserializer; import org.apache.kafka mon.serialization.StringSerializer; import java.util.Collections; import java.util.Properties; import java.util.concurrent.Future; /** * @Description kafka工具类,提供消息发送与监听 */ public class KafkaUtils { /** * 获取实始化KafkaStreamServer对象 * @return */ public static KafkaStreamServer bulidServer(){ return new KafkaStreamServer(); } /** * 获取实始化KafkaStreamClient对象 * @return */ public static KafkaStreamClient bulidClient(){ return new KafkaStreamClient(); } public static class KafkaStreamServer{ KafkaProducer<String, String> kafkaProducer = null; private KafkaStreamServer(){} /** * 创建配置属性 * @param host * @param port * @return */ public KafkaStreamServer createKafkaStreamServer(String host, int port){ String bootstrapServers = String.format("%s:%d", host, port); if (kafkaProducer != null){ return this; } Properties properties = new Properties(); //kafka地址,多个地址用逗号分割 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); kafkaProducer = new KafkaProducer<>(properties); return this; } /** * 向kafka服务发送生产者消息 * @param topic * @param msg * @return */ public Future<RecordMetadata> sendMsg(String topic, String msg){ ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg); Future<RecordMetadata> future = kafkaProducer.send(record); System.out.println("消息发送成功:" + msg); return future; } /** * 关闭kafka连接 */ public void close(){ if (kafkaProducer != null){ kafkaProducer.flush(); kafkaProducer.close(); kafkaProducer = null; } } } public static class KafkaStreamClient { KafkaConsumer<String, String> kafkaConsumer = null; private KafkaStreamClient(){} /** * 配置属性,创建消费者 * @param host * @param port * @return */ public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){ String bootstrapServers = String.format("%s:%d", host, port); if (kafkaConsumer != null){ return this; } Properties properties = new Properties(); //kafka地址,多个地址用逗号分割 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); kafkaConsumer = new KafkaConsumer<String, String>(properties); return this; } /** * 客户端消费者拉取消息,并通过回调HeaderInterface实现类传递消息 * @param topic * @param headerInterface */ public void pollMsg(String topic, HeaderInterface headerInterface) { kafkaConsumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { try{ headerInterface.execute(record); }catch(Exception e){ e.printStackTrace(); } } } } /** * 关闭kafka连接 */ public void close(){ if (kafkaConsumer != null){ kafkaConsumer.close(); kafkaConsumer = null; } } } @FunctionalInterface interface HeaderInterface{ void execute(ConsumerRecord<String, String> record); } /** * 测试示例 * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { //生产者发送消息 // KafkaStreamServer kafkaStreamServer = KafkaUtils.bulidServer().createKafkaStreamServer("127.0.0.1", 9092); // int i=0; // while (i<10) { // String msg = "Hello," + new Random().nextInt(100); // kafkaStreamServer.sendMsg("test", msg); // i++; // Thread.sleep(100); // } // kafkaStreamServer.close(); // System.out.println("发送结束"); System.out.println("接收消息"); KafkaStreamClient kafkaStreamClient = KafkaUtils.bulidClient().createKafkaStreamClient("127.0.0.1", 9092, "consumer-45"); kafkaStreamClient.pollMsg("test", new HeaderInterface() { @Override public void execute(ConsumerRecord<String, String> record) { System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value())); } }); } }Kafka消息服务之Java工具类由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Kafka消息服务之Java工具类”