在Nodejs中使用kafka(四)消息批量发送,事务
- 游戏开发
- 2025-09-02 10:51:02

消息批量发送
生产者使用sendBatch同时发送多个主题消息,给消费者组group1和group2,消费者使用eachBatch接搜消息。
producer.ts
import { CompressionTypes, Kafka } from 'kafkajs'; async function run() { const kafka = new Kafka({ clientId: 'test5', brokers: ['localhost:9092'], connectionTimeout: 1000, // 1 秒连接超时 }); const producer = kafka.producer(); await producer.connect(); for (let i = 1; i <= 10; ++i) { let name = Math.random().toString().slice(2, -1); let age = Math.ceil(Math.random() * 40); let sex = Math.random() * 100 > 50 ? 1 : 0; let person = { id: i, name, age, sex, desc: `id:${i},姓名:${name},年龄:${age},性别:${sex}`, }; await producer.sendBatch({ // acks = 0:不等待任何副本确认,性能最好,但数据丢失风险最大。 // acks = 1:只等待 leader 副本确认,性能较好,但在 leader 宕机时仍有丢失风险。 // acks = -1 或 acks = all:等待所有副本确认,保证最高数据可靠性,但牺牲吞吐量和增加延迟。 acks: -1, topicMessages: [1, 2, 3].map((item) => { return { topic: `topic${item}`, messages: [1, 2, 3].map(item => { return { value: `${person.id}.${item}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`, headers: { id: String(item) }, }; }), }; }), compression: CompressionTypes.GZIP, }); console.log(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`); } await producer.disconnect(); } run();consumer.ts
import { Kafka } from 'kafkajs' const kafka = new Kafka({ clientId: 'test3', brokers: ['localhost:9092'], connectionTimeout: 100, // 0.1 秒连接超时 requestTimeout: 1000, }); const consumer = kafka.consumer({ groupId: 'group1', rackId: 'test2.group1.consumer1', maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes) sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s) rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s) heartbeatInterval: 6000, // 心跳间隔(默认 3s) maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s) allowAutoTopicCreation: true, // topic不存在自动创建 }); await consumer.connect(); await consumer.subscribe({ topic: 'topic1', fromBeginning: true }); await consumer.subscribe({ topic: 'topic2', fromBeginning: true }); await consumer.subscribe({ topic: 'topic3', fromBeginning: true }); await consumer.run({ autoCommit: false, eachBatch: async ({ batch, resolveOffset }) => { batch.messages.forEach(msg => { console.log('batch:', batch.topic, msg.headers?.id?.toString?.(), msg.value?.toString()); resolveOffset(batch.lastOffset()); }); }, });consumer2.ts
import { Kafka } from 'kafkajs' const kafka = new Kafka({ clientId: 'test4', brokers: ['localhost:9092'], connectionTimeout: 100, // 0.1 秒连接超时 requestTimeout: 1000, }); const consumer = kafka.consumer({ groupId: 'group2', rackId: 'test2.group2.consumer2', maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes) sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s) rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s) heartbeatInterval: 6000, // 心跳间隔(默认 3s) maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s) allowAutoTopicCreation: true, // topic不存在自动创建 }); await consumer.connect(); await consumer.subscribe({ topic: 'topic1', fromBeginning: true }); await consumer.subscribe({ topic: 'topic2', fromBeginning: true }); await consumer.subscribe({ topic: 'topic3', fromBeginning: true }); await consumer.run({ autoCommit: false, // 禁用自动提交 partitionsConsumedConcurrently: 10, // 控制每次最多消费的分区数 eachBatch: async ({ batch, resolveOffset, heartbeat }) => { batch.messages.forEach(msg => { console.log('batch:', msg.value?.toString()); }); resolveOffset(batch.lastOffset()); // await heartbeat(); }, }); 事务生产者启动事务发送消息给两个不同topic的消费者,当本次事务数据全部发送成功并确认后会向下偏移,如果有消息处理不成功会回滚整个事务。
producer.ts
import { CompressionTypes, Kafka } from 'kafkajs'; async function run() { const kafka = new Kafka({ clientId: 'test6', brokers: ['localhost:9092'], connectionTimeout: 1000, // 1 秒连接超时 }); const producer = kafka.producer({ transactionalId: 't1', maxInFlightRequests: 1, // 仅一个请求在等待响应时发送 idempotent: true, // 启用幂等性,确保消息不会重复,精确一次语义 }); await producer.connect(); for (let i = 1; i <= 10; ++i) { let name = Math.random().toString().slice(2, -1); let age = Math.ceil(Math.random() * 40); let sex = Math.random() * 100 > 50 ? 1 : 0; let person = { id: i, name, age, sex, desc: `id:${i},姓名:${name},年龄:${age},性别:${sex}`, }; const transaction = await producer.transaction() try { await transaction.send({ topic: 'topic1', compression: CompressionTypes.GZIP, messages: [ { value: `transcation,${person.id}.${i}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`, headers: { id: String(i) }, } ], }); await transaction mit(); } catch (e) { await transaction.abort(); } console.log(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`); } await producer.disconnect(); } run();consumer.ts
import { Kafka } from 'kafkajs' const kafka = new Kafka({ clientId: 'test7', brokers: ['localhost:9092'], connectionTimeout: 100, // 0.1 秒连接超时 requestTimeout: 1000, }); const consumer = kafka.consumer({ groupId: 'group1', rackId: 'test7.group1.consumer1', maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes) sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s) rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s) heartbeatInterval: 6000, // 心跳间隔(默认 3s) maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s) allowAutoTopicCreation: true, // topic不存在自动创建 }); await consumer.connect(); await consumer.subscribe({ topic: 'topic1', fromBeginning: true }); await consumer.run({ autoCommit: false, eachMessage: async ({ message, topic, partition }) => { console.log(topic, partition, message.value?.toString(), message.headers?.id?.toString(), message.offset); consumer mitOffsets([{ topic, partition, offset: String(Number(message.offset) + 1) }]); }, });consumer2.ts
import { Kafka } from 'kafkajs' const kafka = new Kafka({ clientId: 'test8', brokers: ['localhost:9092'], connectionTimeout: 100, // 0.1 秒连接超时 requestTimeout: 1000, }); const consumer = kafka.consumer({ groupId: 'group2', rackId: 'test8.group2.consumer2', maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes) sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s) rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s) heartbeatInterval: 6000, // 心跳间隔(默认 3s) maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s) allowAutoTopicCreation: true, // topic不存在自动创建 }); await consumer.connect(); await consumer.subscribe({ topic: 'topic1', fromBeginning: true }); await consumer.run({ autoCommit: false, eachMessage: async ({ message, topic, partition }) => { console.log(topic, partition, message.value?.toString(), message.headers?.id?.toString(), message.offset); consumer mitOffsets([{ topic, partition, offset: String(Number(message.offset) + 1) }]); }, });
在Nodejs中使用kafka(四)消息批量发送,事务由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“在Nodejs中使用kafka(四)消息批量发送,事务”