kakfa-3:ISR机制、HWLEO、生产者、消费者、核心参数负载均衡
- 其他
- 2025-09-11 17:24:02

1. kafka内核原理 1.1 ISR机制 光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗? 不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。 ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。 如果要保证写入kafka的数据不丢失,首先需要保证ISR中至少有一个follower,其次就是在一条数据写入了leader partition之后,要求必须复制给ISR中所有的follower partition,才能说代表这条数据已提交,绝对不会丢失,这是Kafka给出的承诺 1.2 HW&LEO原理
LEO
last end offset,日志末端偏移量,标识当前日志文件中下一条待写入的消息的offset。举一个例子,若LEO=10,那么表示在该副本日志上已经保存了10条消息,位移范围是[0,9]。HW
Highwatermark,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。任何一个副本对象的HW值一定不大于其LEO值。 小于或等于HW值的所有消息被认为是“已提交的”或“已备份的”。HW它的作用主要是用来判断副本的备份进度. 下图表示一个日志文件,这个日志文件中只有9条消息,第一条消息的offset(LogStartOffset)为0,最有一条消息的offset为8,offset为9的消息使用虚线表示的,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取offset在 0 到 5 之间的消息,offset为6的消息对消费者而言是不可见的。 leader持有的HW即为分区的HW,同时leader所在broker还保存了所有follower副本的leo (1)关系:leader的leo >= follower的leo >= leader保存的follower的leo >= leader的hw >= follower的hw (2)原理:上面关系反应出各个值的更新逻辑的先后更新LEO的机制
注意 follower副本的LEO保存在2个地方 (1)follower副本所在的broker缓存里。 (2)leader所在broker的缓存里,也就是leader所在broker的缓存上保存了该分区所有副本的LEO。更新LEO的时机
follower更新LEO (1)follower的leo更新时间 每当follower副本写入一条消息时,leo值会被更新 (2)leader端的follower副本的leo更新时间 当follower从leader处fetch消息时,leader获取follower的fetch请求中offset参数,更新保存在leader端follower的leo。 leader更新LEO (1)leader本身的leo的更新时间:leader向log写消息时更新HW的机制
follower更新HW
follower更新HW发生在其更新完LEO后,即follower向log写完数据,它就会尝试更新HW值。具体算法就是比较当前LEO(已更新)与fetch响应中leader的HW值,取两者的小者作为新的HW值。leader更新HW
leader更新HW的时机 (1)producer 向 leader 写消息时 (2)leader 处理 follower 的 fetch 请求时 (3)某副本成为leader时 (4)broker 崩溃导致副本被踢出ISR时 leader更新HW的方式 当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(当然也包括leader自己的LEO),并选择最小的LEO值作为HW值。 这里的满足条件主要是指副本要满足以下两个条件之一: (1)处于ISR中 (2)副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值(默认值是10秒) 2. producer消息发送原理 2.1 producer核心流程概览1、ProducerInterceptors是一个拦截器,对发送的数据进行拦截
ps:说实话这个功能其实没啥用,我们即使真的要过滤,拦截一些消息,也不考虑使用它,我们直接发送数据之前自己用代码过滤即可2、Serializer 对消息的key和value进行序列化
3、通过使用分区器作用在每一条消息上,实现数据分发进行入到topic不同的分区中
4、RecordAccumulator收集消息,实现批量发送
按照分区构建不同的队列,将消息封装成一个个batch(16kb)
它是一个缓冲区,可以缓存一批数据,把topic的每一个分区数据存在一个队列中,然后封装消息成一个一个的batch批次,最后实现数据分批次批量发送。5、Sender线程从RecordAccumulator获取消息
6、构建ClientRequest对象
7、将ClientRequest交给 NetWorkClient准备发送
8、NetWorkClient 将请求放入到KafkaChannel的缓存
9、发送请求到kafka集群
10、调用回调函数,接受到响应
3. producer核心参数 回顾之前的producer生产者代码 package com.kaikeba.producer; import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * 需求:开发kafka生产者代码 */ public class KafkaProducerStudyDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { //准备配置属性 Properties props = new Properties(); //kafka集群地址 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); //acks它代表消息确认机制 // 1 0 -1 all props.put("acks", "all"); //重试的次数 props.put("retries", 0); //批处理数据的大小,每次写入多少数据到topic props.put("batch.size", 16384); //可以延长多久发送数据 props.put("linger.ms", 1); //缓冲区的大小 props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka mon.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka mon.serialization.StringSerializer"); //添加自定义分区函数 props.put("partitioner.class","com.kaikeba.partitioner.MyPartitioner"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 100; i++) { // 这是异步发送的模式 producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello-kafka-"+i), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null) { // 消息发送成功 System.out.println("消息发送成功"); } else { // 消息发送失败,需要重新发送 } } }); // 这是同步发送的模式 //producer.send(record).get(); // 你要一直等待人家后续一系列的步骤都做完,发送消息之后 // 有了消息的回应返回给你,你这个方法才会退出来 } producer.close(); } } 3.1 常见异常处理不管是异步还是同步,都可能让你处理异常,常见的异常如下:
1)LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可。如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException 2)NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可 3)NetworkException:网络异常,重试即可 我们之前配置了一个参数,retries,他会自动重试的,但是如果重试几次之后还是不行,就会提供Exception给我们来处理了。retries
重新发送数据的次数retry.backoff.ms
两次重试之间的时间间隔 3.2 提升消息吞吐量buffer.memory
设置发送消息的缓冲区,默认值是33554432,就是32MB 如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住compression.type
producer用于压缩数据的压缩类型。默认是none表示无压缩。可以指定gzip、snappy压缩最好用于批量处理,批量处理消息越多,压缩性能越好。batch.size
producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。默认是16384Bytes,即16kB,也就是一个batch满了16kB就发送出去 如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。linger.ms
这个值默认是0,就是消息必须立即被发送 一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kB,自然就会发送出去。 但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。 3.3 请求超时 max.request.size 这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mb这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把他设置更大一些(企业一般设置成10M) request.timeout.ms 这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理 3.4 ACK参数acks参数,其实是控制发送出去的消息的持久化机制的。
acks=0
生产者只管发数据,不管消息是否写入成功到broker中,数据丢失的风险最高 producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了。 你也不知道的,但是说实话,你如果真是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的。会让你的发送吞吐量会提升很多,你发送弄一个batch出去,不需要等待人家leader写成功,直接就可以发送下一个batch了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图。acks=1
只要leader写入成功,就认为消息成功了. 默认给这个其实就比较合适的,还是可能会导致数据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader.acks=all,或者 acks=-1
这个leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以返回响应说这条消息写入成功了,此时你会收到一个回调通知. 这种方式数据最安全,但是性能最差。如果要想保证数据不丢失,得如下设置
(1)min.insync.replicas = 2 ISR里必须有2个副本,一个leader和一个follower,最最起码的一个,不能只有一个leader存活,连一个follower都没有了。 (2)acks = -1 每次写成功一定是leader和follower都成功才可以算做成功,这样leader挂了,follower上是一定有这条数据,不会丢失。 (3)retries = Integer.MAX_VALUE 无限重试,如果上述两个条件不满足,写入一直失败,就会无限次重试,保证说数据必须成功的发送给两个副本,如果做不到,就不停的重试。 除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失 3.5 重试乱序 max.in.flight.requests.per.connection 每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer同一时间只能发送一条消息 消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer同一时间只能发送一条消息 4. broker核心参数server.properties配置文件核心参数
【broker.id】 每个broker都必须自己设置的一个唯一id 【log.dirs】 这个极为重要,kafka的所有数据就是写入这个目录下的磁盘文件中的,如果说机器上有多块物理硬盘,那么可以把多个目录挂载到不同的物理硬盘上,然后这里可以设置多个目录,这样kafka可以数据分散到多块物理硬盘,多个硬盘的磁头可以并行写,这样可以提升吞吐量。 【zookeeper.connect】 连接kafka底层的zookeeper集群的 【Listeners】 broker监听客户端发起请求的端口号,默认是9092 【unclean.leader.election.enable】 默认是false,意思就是只能选举ISR列表里的follower成为新的leader,1.0版本后才设为false,之前都是true,允许非ISR列表的follower选举为新的leader 【delete.topic.enable】 默认true,允许删除topic 【log.retention.hours】 可以设置一下,要保留数据多少个小时(默认168小时),这个就是底层的磁盘文件,默认保留7天的数据,根据自己的需求来就行了【log.retention.hours】 可以设置一下,要保留数据多少个小时(默认168小时),这个就是底层的磁盘文件,默认保留7天的数据,根据自己的需求来就行了
【unclean.leader.election.enable】 默认是false,意思就是只能选举ISR列表里的follower成为新的leader,1.0版本后才设为false,之前都是true,允许非ISR列表的follower选举为新的leader
5. consumer消费原理 5.1 Offset管理 每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。所以后来就是提交offset发送给内部topic:__consumer_offsets,提交过去的时候,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact。也就是每个group.id+topic+分区号就保留最新的那条数据即可。而且因为这个 __consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多。
5.2 Coordinator 协调器Coordinator的作用
每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance. 根据内部的一个选择机制,会挑选一个对应的Broker,Kafka总会把你的各个消费组均匀分配给各个Broker作为coordinator来进行管理的. consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。如何选择哪台是coordinator
consumer-roup-id.hashcode % 分区数 =》分区号,该分区的leader所在的broker就是这个consumer group的coordinator。
首先对消费组的groupId进行hash,接着对consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到consumer_offsets的哪个分区。 比如说:groupId,"membership-consumer-group" -> hash值(数字)-> 对50取模 -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到consumer_offsets的一个分区,consumer_offset的分区的副本数量默认来说1,只有一个leader,然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护一个Socket连接跟这个Broker进行通信。 6. consumer消费者Rebalance策略 比如我们消费的一个topic主题有12个分区:p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11 假设我们的消费者组里面有三个消费者。 6.1 range范围策略 range策略就是按照partiton的序号范围 p0~3 consumer1 p4~7 consumer2 p8~11 consumer3 默认就是这个策略 6.2 round-robin轮循策略 consumer1: 0,3,6,9 consumer2: 1,4,7,10 consumer3: 2,5,8,11 但是前面的这两个方案有个问题: 假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3 这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上 6.3 sticky黏性策略 最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer 的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略 consumer1: 0-3 consumer2: 4-7 consumer3: 8-11 假设consumer3挂了 consumer1:0-3,+8,9 consumer2: 4-7,+10,11 7. consumer核心参数 【heartbeat.interval.ms】 默认值:3000 consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作 【session.timeout.ms】 默认值:10000 kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒 【max.poll.interval.ms】 默认值:300000 如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了 【fetch.max.bytes】 默认值:1048576 获取一条消息最大的字节数,一般建议设置大一些 【max.poll.records】 默认值:500条 一次poll返回消息的最大条数, 【connections.max.idle.ms】 默认值:540000 consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收 【auto.offset.reset】 earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置开始消费 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 注:我们生产里面一般设置的是latest 【enable.auto mit】 默认值:true 设置为自动提交offset 【auto mit.interval.ms】 默认值:60 * 1000 每隔多久更新一下偏移量官网查看kafka参数http://kafka.apache.org/10/documentation.html
8. 指定位置开始消费1.从0开始消费
TopicPartition partition = new TopicPartition("order", 0); consumer.assign(Arrays.asList(partition)); consumer.seekToBeginning(Arrays.asList(partition)); while (true) { ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<Integer, String> record : poll) { System.out.println(record.key() + "-------" + record.value()); } }2.从指定位置开始消费
TopicPartition partition = new TopicPartition("order", 0); consumer.assign(Arrays.asList(partition)); //从指定位置开始消费 consumer.seek(partition, 5310); while (true) { ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<Integer, String> record : poll) { System.out.println(record.key() + "-------" + record.value()); } }9. 总结!!! 9.1、内核原理
ISR机制 :就是跟leader同步的follower的数量,默认再ISR列表中的follower才可以参与leader的选举,【unclean.leader.election.enable】参数默认是false,修改为true,就可以让不在ISR列表中的follower参与leader选举
HW:Highwatermark,标定了一个偏移量值,该偏移量之前的消息对消费者可见,可以消费。
LEO:last end offset,日志末端偏移量,标记当前log文件中下一条待写入的消息的offset。
HW<=LEO
更新机制
1、leader写log后更新自己的LEO
2、follower请求从leader拉去数据时带着自己LEO,leader根据这个跟更新这边follower副本的LEO
3、比较leader和副本取最小LEO更新作为leader的HW
4、follower得到同步数据包含leader的HW,写数据后更新自己的LEO,并得到的HW作为自己的HW更新
2、3、4会重复因为有多个follower会请求同步数据
9.2、生产者核心流程生产者主线程经过消息序列化和分区,发送到一个缓冲区Record Accumulator(32M),这个缓冲区中会依照partition数量来构建多个队列,并将消息封装成一个个batch(16kb)【涉及一个参数linger.ms】,发送给sender线程,sender线程将消息封装成一个request发送给kafka的channel,kafka做出响应。
9.3、生产者核心参数 吞吐量buffer.memory 默认 32MB 缓冲区Record Accumulator
batch.size 默认 16kb 缓冲区中的队列中的batch大小
linger.ms 默认是 0
假如设置为100ms,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch大小达到【batch.size的值】16kB,自然就会发送出去。 但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。
compression.type 默认是none ,可以指定gzip、snappy,用于批量处理,批量处理消息越多,压缩性能越好。
ACK参数ack=0 生产者发送消息不做确认,直接视为发送成功
ack=1 只确认leader写入成功,视为发送成功
ack=all(或-1) 确认leader写入成功以及ISR中的副本都写入成功,才视为发送成功
retries参数只能在 ack=1和all的时候起作用
消息重试乱序:
max.in.flight.requests.per.connection 就是生产者给broker发送消息可以接受有多少条消息没有响应,默认是5
消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer同一时间只能发送一条消息
9.4、broker节点核心参数【log.retention.hours】 可以设置一下,要保留数据多少个小时(默认168小时),这个就是底层的磁盘文件,默认保留7天的数据,根据自己的需求来就行了
【unclean.leader.election.enable】 默认是false,意思就是只能选举ISR列表里的follower成为新的leader,1.0版本后才设为false,之前都是true,允许非ISR列表的follower选举为新的leader
9.5、消费者offset偏移量管理【auto.offset.reset】
【enable.auto mit】
【auto mit.interval.ms】
0.8前存在zk上,后面存在kafka自身的一个topic【consumer_offsets】里
一般不自动提交offset,
9.6、消费者Coordinator协调器负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance.
consumer-roup-id.hashcode % 分区数 =》分区号,该分区的leader所在的broker就是这个consumer group的coordinator。
9.7、消费者负载均衡rebalance策略范围策略,连续平分
轮循策略
黏性策略,在负载均衡的基础上如果有消费者宕机,保证其他消费者消费分区不变,将宕机的消费者的分区平分
kakfa-3:ISR机制、HWLEO、生产者、消费者、核心参数负载均衡由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“kakfa-3:ISR机制、HWLEO、生产者、消费者、核心参数负载均衡”