主页 > 创业  > 

Kafka集群,常见MQ面试问题

Kafka集群,常见MQ面试问题
Kafka 名词介绍

Topic: 消息队列,生产者和消费者面向的都是一个TopicBroker: 一个Kafka服务器就是一个Broker,一个集群由多个Broker组成。一个Broker可以容纳多个TopicProducer: 消息生产者,向Kafka Broker发生消息的客户端Consumer: 消息消费者,向Kafka Broker取消息的客户端Consumer Group(CG): 消费者组,有多个Consumer组成。消费者组内每个消费者负责不同分区的数据,一个分区只能有一个组内消费者消费;消费者组之间互不影响。所有消费者都属于某一个消费者组Partition: 为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个topic可以分为多个partitionReplica: 副本,每一个分区都有若干个副本,保证系统的稳定性Leader: 分区副本的“领导者”,生产者发送数据的对象,消费者消费数据的对象Follower: 分区副本的“从”,保持和Leader数据的同步。Leader发送故障,会从Follower中选举新的Leader Kafka集群安装

使用docker对Kafka进行集群安装,docker-compose.yaml

# 切换到目标目录 # 目录结构 #. #├── docker-compose.yaml #│ #├── kafka_data1 #│ #├── kafka_data2 #│ #├── kafka_data3 #│ #├── zookeeper_data1 #│ #├── zookeeper_data2 #│ #└── zookeeper_data3 # 启动 docker-compose up -d 配置

详情见server.properties配置

操作命令

进入容器docker exec -it kafka1 /bin/bash

Topic

可以使用kafka-topics.sh查看操作主题的命令,常用命令如下

参数描述--bootstrap-server <String: server to connect to>连接的 Kafka Broker 主机名称和端口号。--topic <String: topic>操作的主题名称。--create创建主题。--delete删除主题。--alter修改主题配置。--list列出所有主题。--describe查看指定主题的详细信息。--partitions <Integer: # of partitions>设置主题的分区数量(仅在创建主题时使用)。--replication-factor <Integer: replication factor>设置分区的副本因子(仅在创建主题时使用)。--config <String: name=value>更新或设置主题的配置参数。 # 创建一个主题 kafka-topics.sh --bootstrap-server kafka1:9093 --create --partitions 1 --replication-factor 3 --topic first ## --partitions 分区数 ## --replication-factor 副本数 ## --topic 主题名称 # 查看主题列表 kafka-topics.sh --bootstrap-server kafka1:9093 --list # 查看主题详情 kafka-topics.sh --bootstrap-server kafka1:9093 --describe --topic first Topic: first TopicId: nmM8zHmmRsqm5WjhTXuYHw PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: first Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Elr: N/A LastKnownElr: N/A ## TopicId Topic 唯一标识符(UUID) ## Partition: 0 Topic 中的第一个分区(编号从 0 开始) ## Leader: 2 分区 0 的 Leader 是 Broker 2 # 修改分区数,分区数只能增加,不能减少 kafka-topics.sh --bootstrap-server kafka1:9093 --alter --topic first --partitions 3 # 删除分区 kafka-topics.sh --bootstrap-server kafka1:9093 --topic first --delete Producer

可以使用kafka-console-producer.sh查看生产者的命令,常用命令如下

参数描述--bootstrap-server <String: server to connect to>连接的 Kafka Broker 主机名称和端口号。--topic <String: topic>操作的主题名称。 # 生产消息 kafka-console-producer.sh --bootstrap-server kafka2:9094 --topic first 发送消息流程

生产者代码

kafkaProducer需要配置一些基本的参数,比如 Kafka 集群的地址、序列化方式、重试策略等发送消息,当batch.size=16k || linger.ms = 0ms时会调用send()方法将消息发送到指定的 topic 和 partition发送请求到Broker Kafka生产者将序列化后的消息发送给目标broker。生产者与Kafka broker之间的通信通常基于TCP协议,生产者发送的消息会经过 Kafka 的网络层,最终到达目标 broker确认机制acks acks=0:生产者不等待确认,发送后就认为消息发送成功。acks=1:生产者等待 leader 节点的确认,消息在 leader 节点写入成功后返回确认。acks=all(或 acks=-1):生产者等待所有副本的确认,只有所有副本都成功写入后才返回确认,保证消息的持久性。 错误处理与重试,如果消息发送失败,Kafka 生产者会根据 retries 配置进行重试 生产数据可靠性

主要设置acks应答级别可以解决

acks=0:生产者不等待确认,发送后就认为消息发送成功。可靠性差,效率高acks=1:生产者等待 leader 节点的确认,消息在 leader 节点写入成功后返回确认。可靠性中等,效率中等acks=all(或 acks=-1):生产者等待所有副本的确认,只有所有副本都成功写入后才返回确认,保证消息的持久性。可靠性高,效率低,数据可能会重复 幂等性

幂等性是指Producer不论向Broker发送多少次重复数据,Broker只会持久化一条 重复数据的判断标准:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;SeqNumber是单调递增的 开启参数 enable.idempotence 默认为 true,false关闭

Broker 工作流程 Kafka 启动成功后,会在zk中注册注册的先后顺序可以争夺leader,broker 获取Controller权限选举出Controller之后,监听Brokers节点变化Controller决定Leader选举,选举规则:在isr中存活为前提。按照AR中排在前面的优先Controller将节点信息上传到ZK上其他Controller同步Leader Controller 信息

如果Brokers中的Leader挂了,根据第4点的选举规则,选举新的Leader,更新Leader,以及isr

Kafka副本

Kafka副本主要作用,提高数据可靠性 Kafka默认1个副本,生产环境一般配置2个 Kafka分区中的所有副本统称为AR(Assigned Repllicas) AR = isr + osr

isr: 表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据(30s),则该Follower将被踢出isr

osr: 表示Follower与Leader副本同步时,延迟过多的副本

Kafka 文件存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每一个partition对应一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每一个segment包含:“.index"文件、”.log"文件、".timeindex"文件。

Kafka存储日志的索引使用的是稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引

Kafka集群日志,默认7天后被清除,清楚策略有两种:删除、压缩策略

delete 删除日志,log.cleanup.policy=delete,基于时间或者基于大小compact 日志压缩,对于相同key的不同的value值,保留最后一个版本 Kafka高效读写数据 Kafka本身是分布式集群,可以采用分区技术,并行度高读数据采用稀疏索引,可以快速定位要消费的数据顺序写磁盘,Kafka的producer生产数据,要写入到log文件中,写的过程是一只追加到文件末端,为顺序写零拷贝和页缓存(操作系统缓存) 数据积压,提高Kafka吞吐量 增加分区,增加消费者个数提高生产者吞吐量,修改参数配置 batch.size批次大小,默认16klinger.ms等待时间,默认0mscompression.type压缩snappyRecordAccumulator缓冲区大小,默认32m 提高消费者吞吐量,修改参数配置 fetch.max.bytes 增加消费者一次拉取的消息条数,默认50mmax.poll.records 增加一次poll拉取数据返回消息的最大条数 Consumer 消费方式 pull(拉)模式,消费只主动拉去消息,Kafka采用这一个push(推)模式,集群推送消息到消费者,网络消耗比较大 常用操作

可以使用kafka-console-consumer.sh查看生产者的命令,常用命令如下

参数描述--bootstrap-server <String: server to connect to>连接的 Kafka Broker 主机名称和端口号。--topic <String: topic>操作的主题名称。--from-beginning从头开始消费--group <String: consumer group id>指定消费者组名称 # 接收消息 ## 注意如果没有--from-beginning 参数,不会获取到订阅之前的消息 ## 先订阅再有消息 kafka-console-consumer.sh --bootstrap-server kafka2:9094 --topic first --from-beginning Consumer Group 消费者组

由多个consumer组成,形成一个消费者组的条件,消费者组的groupid相同

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费消费者组之间互不影响。所有消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者 消费消息流程 配置消费者配置,集群地址、key和value的反序列化、组名消费者订阅主题消费者初始化,分区分配消费者拉取数据(poll循环)消费数据(消息处理)提交偏移量,分为自动提交,和手动提交,手动提交有分为同步提交和异步提交消费者故障和再平衡 消费者组初始化流程

使用coordinator辅助实现消费者组的初始化和分区,每一个broker都有coordinator。选择一个coordinator作为集群的leader

每一个消费者都会发送一个JoinGroup请求,发送至coordinator leader随机选一个consumer作为leader把要消费的topic情况发送给消费者leader消费者leader制定消费方案把消费方案发给coordinatorcoordinator把消费方案发送给各个consumer每一个消费者都会和coordinator保持心跳(3ms),一旦超时(45s),该消费者会被移除,并触发再平衡(重新分配任务),或者消费者处理消息的时间过长(超过5分钟) 消费者组详细消费流程 消费者发送消费请求到ConsumerNetworkClientConsumerNetworkClient会每批次抓数据,当数据达到fetch.min.bytes大小或者fetch.max.wat.ms时间抓去数据成功回调,进过序列化到达消费者 消费者分区分配

一个consumer group中有多个consumer组成,一个topic有多个partition组成,哪一个consumer来消费哪个partition的数据 Kafka有四种主流分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky

通过partition.assignment.strategy配置分区分配策略,默认策略是Range+CooperativeSticky

Range分区

对于每一个topic都是“均分”,将分区均分给消费者,余的分区分给靠前的消费者

例如:8个分区,分给三个消费者,前两个消费者分的3个分区,最后一个消费者分两个分区

缺点:在45s内停掉消费者A,随机由一个分区会直接承担消费者A的分区数据。从而造成数据倾斜

这里的45s是指分区coordinator的心跳机制,会重新分配

RoundRobin

RoundRobin 针对所有Topic去轮训分区策略,把所有partition和所有的consumer都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配

例如:8个分区,分给三个消费者A、B、C A:0、3、6 B:1、4、7 C:2、5

停掉A分区之后,45秒内分区产生的数据,消费者A负责的分区会被轮询的分给B、C

Sticky 分区

粘性分区定义:分配的结果带有“粘性的”,即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动。将分区随机均匀的分配给所有的消费者。

offset

offset 保存着消费者消费数据的位移,从0.9版本开支,consumer将offset保存在Kafka一个内置的topic中,该topic为_consumer_offsets中,在此之前的版本,是存储在Zookeeper中。保存在Kafka中的目的是降低I/O…

_consumer_offsets主题里面采用key和value的方法存储数据 key是group.id+topic+分区号,value是当前offset的值

修改配置文件consumer.properties,添加exclude.internal.topics=false,默认是true,表示不能消费系统主题。查看事例如下

# 创建一个新的topic kafka-topics.sh --bootstrap.server kafka1:9093 --create --topic test_offset --partitions 2 --replication-factor 2 # 启动生产者,并生产数据 kafka-console-producer.sh --topic test_offset --bootstrap.server kafka1:9093 # 启动消费者消费数据 kafka-console-consumer.sh --bootstrap.server kafka1:9093 --topic test_offset --group test # 查看消费者消费主题 kafka-console-consumer.sh --bootstrap.server kafka1:9093 --topic _consumer_offsets --consumer.config consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetMessageFormatter" --from beginning

自动保存和手动保存

消费者消费数据模式

auto.offset.reset=earliest|latest|none去指定消费者从什么位置开始消费

earliest: 自动将偏移量重置为最早的偏移量 --from-beginninglatest: 自动将偏移量重置为最新偏移量(默认)none: 如果未找到消费者组的先前偏移量,则向消费者抛出异常任意指定offset位移开始消费 常见问题 如何保证消息队列的高可用?

Kafka分布式部署,将多个Broker分散部署在多个机器上,每一个Broker上放一部分数据。创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据

如何保证消息不被重复消费?

这个问题就是在问如何保证消息消费的幂等性?

Kafka有offset偏移量这个概念,关闭自动提交,消费者消费完数据,手动提交

幂等性,要结合具体的业务操作,根据全局id去限制,存在则修改,不存在则新增。再加上Redis快速判别

如何保证消息的可靠性传输?

防止消息丢失,从三个角度出发

生产者 设置acks=-1和重试机制,确保消息发送到队列,并且同步至followerKafka消息队列 确保一主多从,生产者设置acks和重试次数消费者 关闭自动提交offset,手动提交,确保幂等性,防止重复操作 如何保证消息的顺序性?

把相关消息放在一个分区,单机消费者去消费

如何解决消息队列的延时以及过期失效问题? 优化消费者 增加消费者调整消费者配置,调整 fetch.max.bytes 和 fetch.max.wait.ms 配置项 设置消息队列合理的过期时间 log.retention.ms设置过期时间死信队列保存消息,后续分析 消息队列满了以后该怎么处理? 增加磁盘空间调整消息队列,日志清理策略合理控制生产者生产效率提高消费者消费效率 有几百万消息持续积压几小时,说说怎么解决? 修复消费者增加消费者提高消费者消费性能,单次消费数量 数据积压,提高Kafka吞吐量 增加分区,增加消费者个数提高生产者吞吐量,修改参数配置 batch.size批次大小,默认16klinger.ms等待时间,默认0mscompression.type压缩snappyRecordAccumulator缓冲区大小,默认32m 提高消费者吞吐量,修改参数配置 fetch.max.bytes 增加消费者一次拉取的消息条数,默认50mmax.poll.records 增加一次poll拉取数据返回消息的最大条数
标签:

Kafka集群,常见MQ面试问题由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Kafka集群,常见MQ面试问题