主页 > 手机  > 

深入理解Kafka主题分区机制

深入理解Kafka主题分区机制

在分布式消息系统中,Apache Kafka 的主题分区机制是其核心特性之一。它不仅提供了高吞吐量和可扩展性,还通过分区实现了消息的有序存储和高效消费。本文将通过详细的代码示例和分析,帮助读者深入理解 Kafka 的主题分区机制。 一、Kafka 分区的基本概念 在 Kafka 中,每个主题(Topic)被划分为多个分区(Partition)。分区是 Kafka 存储消息的基本单位,每个分区是一个有序的、不可变的消息序列。消息在分区中被分配一个唯一的偏移量(Offset),用于标识消息在分区中的位置。生产者(Producer)在发送消息时可以指定分区,也可以让 Kafka 自动分配分区。消费者(Consumer)按照偏移量顺序读取消息,从而保证消息的顺序性。 二、创建主题和分区 在 Kafka 中,可以通过 Admin API 创建主题并指定分区数量。以下是一个简单的 Java 示例代码,展示如何使用 Kafka 的 AdminClient 创建主题: java复制 package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections; import java.util.Properties;

public class TopicCreator { public static void main(String[] args) throws Exception { createTopic(“example-topic-1”, 1); createTopic(“example-topic-2”, 2); }

private static void createTopic(String topicName, int numPartitions) throws Exception { Properties config = new Properties(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); AdminClient admin = AdminClient.create(config); // 检查主题是否已存在 boolean alreadyExists = admin.listTopics().names().get().stream() .anyMatch(existingTopicName -> existingTopicName.equals(topicName)); if (alreadyExists) { System.out.printf("主题已存在: %s%n", topicName); } else { // 创建新主题 System.out.printf("创建主题: %s%n", topicName); NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1); admin.createTopics(Collections.singleton(newTopic)).all().get(); } // 描述主题 System.out.println("-- 描述主题 --"); admin.describeTopics(Collections.singleton(topicName)).all().get() .forEach((topic, desc) -> { System.out.println("主题: " + topic); System.out.printf("分区数量: %s, 分区ID: %s%n", desc.partitions().size(), desc.partitions().stream().map(p -> Integer.toString(p.partition())).collect(Collectors.joining(","))); }); }

} 运行上述代码后,会创建两个主题:example-topic-1 和 example-topic-2,分别包含 1 个和 2 个分区。 三、消息发送与分区 (一)指定分区发送消息 生产者在发送消息时可以显式指定分区。以下代码展示了如何向单分区主题发送消息: java复制 package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample1 { private static int PARTITION_COUNT = 1; private static String TOPIC_NAME = “example-topic-1”; private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception { KafkaProducer<String, String> producer = ExampleHelper.createProducer(); for (int i = 0; i < MSG_COUNT; i++) { String value = "message-" + i; String key = Integer.toString(i); producer.send(new ProducerRecord<>(TOPIC_NAME, 0, key, value)); } }

} 运行结果如下: 复制 发送消息主题: example-topic-1, key: 0, value: message-0, 分区: 0 发送消息主题: example-topic-1, key: 1, value: message-1, 分区: 0 发送消息主题: example-topic-1, key: 2, value: message-2, 分区: 0 发送消息主题: example-topic-1, key: 3, value: message-3, 分区: 0 (二)多分区主题的消息发送 对于多分区主题,生产者可以将消息发送到不同的分区: java复制 package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample2 { private static int PARTITION_COUNT = 2; private static String TOPIC_NAME = “example-topic-2”; private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception { KafkaProducer<String, String> producer = ExampleHelper.createProducer(); for (int i = 0; i < MSG_COUNT; i++) { for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) { String value = "message-" + i; String key = Integer.toString(i); producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value)); } } }

} 运行结果如下: 复制 发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 0 发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 1 发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 0 发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 1 发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 0 发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 1 发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 0 发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 1 (三)不指定分区发送消息 如果生产者不显式指定分区,Kafka 会根据默认的分区策略(通常基于消息的键)选择分区: java复制 package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample3 { private static String TOPIC_NAME = “example-topic-2”; private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception { KafkaProducer<String, String> producer = ExampleHelper.createProducer(); for (int i = 0; i < MSG_COUNT; i++) { String value = "message-" + i; String key = Integer.toString(i); producer.send(new ProducerRecord<>(TOPIC_NAME, key, value)); } }

} 运行结果如下: 复制 发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 未指定 分区分配: 0 发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 未指定 分区分配: 1 发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 未指定 分区分配: 0 发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 未指定 分区分配: 1 四、消息消费与分区 消费者按照分区顺序读取消息。以下代码展示了如何消费单分区和多分区主题的消息: java复制 package com.logicbig.example;

import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration; import java.util.Arrays; import java.util.Properties;

public class ConsumerExample { public static void main(String[] args) throws Exception { KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(“example-topic-2”); int numMsgReceived = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord<String, String> record : records) { numMsgReceived++; System.out.printf(“消费消息: key = %s, value = %s, 分区ID = %s, 偏移量 = %s%n”, record.key(), record.value(), record.partition(), record.offset()); } consumer mitSync(); if (numMsgReceived >= 8) { break; } } } } 运行结果如下: 复制 消费消息: key = 0, value = message-0, 分区ID = 0, 偏移量 = 0 消费消息: key = 1, value

标签:

深入理解Kafka主题分区机制由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“深入理解Kafka主题分区机制