kafkaconsumer手动ack
- 手机
- 2025-09-19 14:30:02

在消费 Kafka 消息时,手动确认(acknowledge)消息的消费,可以通过使用 KafkaConsumer 类中的 commitSync() 或 commitAsync() 方法来实现。这些方法将提交当前偏移量,确保在消费者崩溃时不会重新消费已处理的消息。
以下是一个简单的手动 ack 的示例代码:
1. 配置 KafkaConsumer 和手动确认消费 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka mon.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class KafkaManualAckConsumer { public static void main(String[] args) { // 配置消费者的基本属性 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 服务器地址 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组ID properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息key反序列化 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息value反序列化 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交,启用手动提交 // 创建 KafkaConsumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 订阅主题 consumer.subscribe(Collections.singletonList("my-topic")); try { while (true) { // 拉取消息 var records = consumer.poll(1000); // 拉取数据,等待最多1000ms // 处理每一条消息 records.forEach(record -> { System.out.println("Consumed message: " + record.value()); // 处理完消息后手动提交偏移量 // commitSync: 确保消息成功提交 consumer mitSync(); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭消费者 consumer.close(); } } } 2. 代码解析 配置消费者: ENABLE_AUTO_COMMIT_CONFIG 设置为 false,禁用自动提交偏移量。这样就可以在处理完每条消息后手动提交。 消息消费与手动 ack: poll(1000) 方法拉取最多 1000 毫秒内的消息。commitSync() 方法用于同步提交当前的偏移量,即消费到的消息的位移,这样可以确保 Kafka 消费者确认该消息已处理。 异常处理: 异常捕获块 catch 用于处理消费过程中可能出现的任何错误,确保程序不会崩溃。 关闭消费者: 在 finally 块中调用 consumer.close() 来关闭消费者连接。 3. 使用 commitAsync 提高性能(可选)如果对性能要求更高,可以考虑使用 commitAsync() 方法,它不会阻塞当前线程,提交操作将在后台异步完成:
consumer mitAsync((offsets, exception) -> { if (exception != null) { System.out.println("Error committing offset: " + exception.getMessage()); } else { System.out.println("Successfully committed offsets: " + offsets); } });这样你可以不阻塞线程,提高消费性能,尤其是在高吞吐量的环境中。 如果你有其他的需求或者想更细致地控制消费的细节,随时告诉我!
kafkaconsumer手动ack由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“kafkaconsumer手动ack”
上一篇
RocketMQ的运行架构