主页 > 电脑硬件  > 

【RocketMQ存储】CommitLogDispatcherBuildConsumeQueue构建Consum

【RocketMQ存储】CommitLogDispatcherBuildConsumeQueue构建Consum

文章目录 1. 前言2. ConsumeQueue 索引构建3. CommitLogDispatcherBuildConsumeQueue#dispatch4. ConsumeQueue 索引构建 - putMessagePositionInfo4.1 findConsumeQueue4.2 putMessagePositionInfoWrapper 构建索引文件消息4.3 putMessagePositionInfo 写入索引4.4 fillPreBlank 填充消息索引4.5 appendMessage 追加消息到 MappedByteBuffer 中 5. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

RocketMQ 存储部分系列文章:

【RocketMQ 存储】- RocketMQ存储类 MappedFile【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础【RocketMQ 存储】- broker 端存储单条消息的逻辑【RocketMQ 存储】- broker 端存储批量消息的逻辑【RocketMQ 存储】- 同步刷盘和异步刷盘【RocketMQ 存储】- 同步刷盘服务 GroupCommitService【RocketMQ 存储】- 异步刷盘服务 FlushRealTimeService【RocketMQ 存储】- 异步提交服务 CommitRealTimeService【RocketMQ 存储】RocketMQ 如何高效创建 MappedFile【RocketMQ 存储】消息重放服务-ReputMessageService 2. ConsumeQueue 索引构建

在上一篇文章中,我们介绍了 ReputMessageService 服务是如何获取 CommitLog 里面的消息进行重放,那么这篇文章我们就讲解 CommitLog 里面的消息是如何重放构建成 ConsumeQueue 索引的,构建的 doDispatch 方法如下:

public void doDispatch(DispatchRequest req) { for (CommitLogDispatcher dispatcher : this.dispatcherList) { dispatcher.dispatch(req); } }

CommitLogDispatcher 是消息分发接口,里面的核心构建分发就是 dispatch。

public interface CommitLogDispatcher { /** * Dispatch messages from store to build consume queues, indexes, and filter data * @param request dispatch message request */ void dispatch(final DispatchRequest request); }

这个类有三个实现类,其中的 CommitLogDispatcherBuildConsumeQueue 就是构建 ConsumeQueue 索引的类。

那么在看具体源码之前,我们来回顾下 ConsumeQueue 的索引结构。

首先是 8 个字节的 CommitLog Offset,就是消息在 CommitLog 中的偏移量,通过这个偏移量可以快速定位到消息的存储位置,从而读取到消息。其次是消息的长度,其实读取 CommitLog 的时候 CommitLog 中记录的第一个字段就是消息的总长度,所以这个消息长度更多是用来更新一些偏移量,比如当前 CnnsumeQueue 下消息最大有效偏移量。最后是 tag hashcode,其实就是 tag 值的 hashCode,在 RocketMQ 中,消费者可以通过指定 tag 来订阅特定类型的消息,同样的可以通过这个 tag hashcode 快速过滤掉不是这个消费者订阅的 tag。 3. CommitLogDispatcherBuildConsumeQueue#dispatch

这个类就是构建 ConsumeQueue 索引类,下面就看下里面的构建方法。

/** * ConsumeQueue 构建服务 */ class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { // 获取消息的类型 sysFlag final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { // 非事务消息 case MessageSysFlag.TRANSACTION_NOT_TYPE: // 事务 commit 消息 case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 这就代表了这条消息可以用于构建 ConsumeQueue DefaultMessageStore.this.putMessagePositionInfo(request); break; // 事务一阶段 prepare 消息和事务一阶段 rollback 消息,不处理 case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } }

这个方法逻辑很简单,就是根据方法的事务状态判断是否需要构建 ConsumeQueue 索引,对于一条消息,如果是事务一阶段的 prepare 或者 rollback,那么就说明这条消息还不能被消费者看见,或者说这条消息一阶段本地事务执行失败了需要回滚,这种情况肯定不能构建 ConsumeQueue 索引的。

那么对于普通消息或者说 commit 状态的消息就可以构建索引了。

4. ConsumeQueue 索引构建 - putMessagePositionInfo

下面就看下如何构建 ConsumeQueue 索引的,就是 putMessagePositionInfo 方法。

/** * 写入消息位置信息 * @param dispatchRequest */ public void putMessagePositionInfo(DispatchRequest dispatchRequest) { /** * 根据 topic 和队列 id 找到对应的 ConsumeQueue */ ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); /** * 构建 ConsumeQueue 索引文件消息 */ cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest)); }

这个方法里面主要就是两个逻辑

首先要构建 ConsumeQueue 索引,那么肯定需要找到这条消息的索引需要构建在哪个 topic 下面的哪个队列。找到 ConsumeQueue 后,再调用里面的 putMessagePositionInfoWrapper 方法去构建索引。

当然了这里面有一部分逻辑涉及到 LMQ,LMQ 是 RocketMQ 在 4.9.3 提出的轻量消息队列,主要面向小微设备消息传输的轻量级消息队列,支持 MQTT 协议,主要是对于 ConsumeQueue 的多队列分发构建索引,当然我们这里就不涉及这里面的源码讲解了。

4.1 findConsumeQueue

这个方法就是根据 topic 和队列 id 找到对应的 ConsumeQueue,RocketMQ 提供了一个 consumeQueueTable 队列集合,存储了 topic -> (queueId, ConsumeQueue) 的集合,大家知道一个 topic 下面是会分为多个 ConsumeQueue 的,所以可以根据这个集合去找。

// 从 consumeQueueTable 中根据 topic 获取 ConsumeQueue 集合 ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);

如果说这个 map 是空的,这时候就创建一个集合设置到 map 中,不过感觉这里写的有点啰嗦了。

if (null == map) { // 这里面就是如果不存在集合就创建一个放到 consumeQueueTable 里面 ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); // putIfAbsent 意思是如果已经存在旧 map 了,那么 newMap 就不会设置到里面 // 同时返回旧 map,如果没有映射关系就把 newMap 设置进去 ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); // 这里意思就是如果原来就存在就设置旧的,否则就设置新的 if (oldMap != null) { map = oldMap; } else { map = newMap; } }

上面我们拿到了 map,这时候在 map 中通过 queueId 去找对应的 ConsumeQueue。

// 获取对应的 ConsumeQueue ConsumeQueue logic = map.get(queueId);

如果这个 ConsumeQueue 是空,那么新建立一个设置到 map 中。

// 如果是空,那么就重新建一个,这里说明 ConsumeQueue 是延时建立的 if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, // topic queueId, // queueId StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // ConsumeQueue 的存储路径 this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), // ConsumeQueue 的大小,一个 ConsumeQueue 可存储 30w 条数据,每条数据 20B this); // DefaultMessageStore // 存入 map 中,如果已存在就获取旧的 ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { // LMQ 的逻辑,ConsumeQueue 队列数 + 1 if (MixAll.isLmq(topic)) { lmqConsumeQueueNum.getAndIncrement(); } logic = newLogic; } }

最后返回创建的消息队列,这里感觉可以写得更简单的。

4.2 putMessagePositionInfoWrapper 构建索引文件消息

首先获取写入的最大重试次数 30 次,然后判断 ConsumeQueue 文件是否可写。

// 最大重试次数 30 次 final int maxRetries = 30; // 判断 ConsumeQueue 文件是否可写 boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();

isCQWriteable 判断文件是否可写,当然这里的判断不是说简单用一个标记位来判断的。

public boolean isWriteable() { if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) { return true; } return false; }

flagBits 是 DefaultMessgeQueue 中专门用于记录存储的状态,比如

NOT_WRITEABLE_BIT 标记 ConsumeQueue 是否可写WRITE_LOGICS_QUEUE_ERROR_BIT 标记是否上一次写入的时候发生了错误DISK_FULL_BIT 标记磁盘是否满了WRITE_INDEX_FILE_ERROR_BIT 标记 IndexFile 的写入是否发生错误,这里说是写入,实际上是创建

当发生上面这些错误的时候就返回 false 表示不可继续往 Consume Queue 里面写入,否则就可以继续写入。

继续回到 putMessagePositionInfoWrapper 方法,往下看,写入 ConsumeQueue 的逻辑最多会重试 30 次,在 for 循环里面去重复执行。

// 重试 30 次 for (int i = 0; i < maxRetries && canWrite; i++) { ... }

在 for 循环中首先处理 ConsumeQueue 扩展消息的 tagsCode,扩展信息后面会出文章介绍,这里就不多赘述。

// 普通消息的 tagCode long tagsCode = request.getTagsCode(); // 是否支持 ConsumeQueue 扩展消息写入 if (isExtWriteEnable()) { // 扩展信息 ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); cqExtUnit.setFilterBitMap(request.getBitMap()); cqExtUnit.setMsgStoreTime(request.getStoreTimestamp()); cqExtUnit.setTagsCode(request.getTagsCode()); long extAddr = this.consumeQueueExt.put(cqExtUnit); if (isExtAddr(extAddr)) { tagsCode = extAddr; } else { log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit, topic, queueId, request.getCommitLogOffset()); } }

接下来调用 putMessagePositionInfo 方法写入消息到 ConsumeQueue 中。

// 写入消息到 ConsumeQueue 中 boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());

如果写入成功,那么记录下最新存储时间。

if (result) { // 写入成功了 if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) { // 设置队列的物理消息最新消息存储时间 this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); } // 设置队列的逻辑消息最新消息存储时间(ConsumeQueue 存储时间) this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); if (multiQueue) { multiDispatchLmqQueue(request, maxRetries); } return; }

这几个时间点都是消息存储的最新时间,RocketMQ 的 StockCheckPoint 文件就是根据这几个时间来进行 Broker 异常重启文件恢复的。

那如果写入失败,就睡眠 1s 重新执行,最多执行 30 次。

else { // XXX: warn and notify me log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset() + " failed, retry " + i + " times"); try { // 写入失败,睡眠 1s 再次执行 Thread.sleep(1000); } catch (InterruptedException e) { log.warn("", e); } }

在 for 循环执行之后如果都还没有写入成功,那么就标记写入 ConsumeQueue 失败,也就是标记 WRITE_LOGICS_QUEUE_ERROR_BIT。

// XXX: warn and notify me log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId); // 到这里就是 30 次都没能写入成功,这里就设置下存储状态是写入逻辑队列失败 this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();

好了,上面就是这个方法的全部逻辑了,下面给出全部代码。

/** * 追加消息到 ConsumeQueue 中 * @param request * @param multiQueue */ public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) { // 最大重试次数 30 次 final int maxRetries = 30; // 判断 ConsumeQueue 文件是否可写 boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); // 重试 30 次 for (int i = 0; i < maxRetries && canWrite; i++) { // 普通消息的 tagCode long tagsCode = request.getTagsCode(); // 是否支持 ConsumeQueue 扩展消息写入 if (isExtWriteEnable()) { // 扩展信息 ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); cqExtUnit.setFilterBitMap(request.getBitMap()); cqExtUnit.setMsgStoreTime(request.getStoreTimestamp()); cqExtUnit.setTagsCode(request.getTagsCode()); long extAddr = this.consumeQueueExt.put(cqExtUnit); if (isExtAddr(extAddr)) { tagsCode = extAddr; } else { log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit, topic, queueId, request.getCommitLogOffset()); } } // 写入消息到 ConsumeQueue 中 boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), tagsCode, request.getConsumeQueueOffset()); if (result) { // 写入成功了 if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) { // 设置队列的物理消息最新消息存储时间 this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); } // 设置队列的逻辑消息最新消息存储时间(ConsumeQueue 存储时间) this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); if (multiQueue) { multiDispatchLmqQueue(request, maxRetries); } return; } else { // XXX: warn and notify me log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset() + " failed, retry " + i + " times"); try { // 写入失败,睡眠 1s 再次执行 Thread.sleep(1000); } catch (InterruptedException e) { log.warn("", e); } } } // XXX: warn and notify me log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId); // 到这里就是 30 次都没能写入成功,这里就设置下存储状态是写入逻辑队列失败 this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); } 4.3 putMessagePositionInfo 写入索引 /** * 写入消息到 ConsumeQueue 中 * @param offset 消息在 CommitLog 中的物理偏移量 * @param size 消息大小 * @param tagsCode 消息 tagsCode,普通消息是 hashCode,延时消息是消息投递到延时队列的时间 * @param cqOffset 消息在 ConsumeQueue 中的偏移量 * @return */ private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { ... }

首先判断下如果消息在 Commitlog 中的偏移量 + 消息大小 <= 已处理的消息的最大偏移量,大家注意 maxPhysicOffset 这个字段是 ConsumeQueue 中已经存储的消息在 CommitLog 中的最大物理偏移量,而不是 CommitLog 中的消息最大物理偏移量。所以如果满足这个条件,说明这条消息已经在 ConsumeQueue 中构建索引了,这时候就没必要继续处理了。

// 如果消息偏移量 + 消息大小 <= 已处理的消息的最大偏移量 if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); // 说明消息已经处理过了,不需要再处理了 return true; }

接下来就是往 ConsumeQueue 中写入索引了,那么首先我们就要把 ConsumeQueue 索引的消息给到一个临时 Buffer 中,这个 Buffer 就是 byteBufferIndex,其实就是写入下面图中索引的三个属性。

// 切换读模式,byteBufferIndex 是临时存储的 ByteBuffer,可以被反复利用,这里切换读模式就相当于将 position 重置了,然后就可以重新写入 this.byteBufferIndex.flip(); // 限制 20B this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); // 存入 8 个字节的消息物理偏移量(CommitLog 偏移量) this.byteBufferIndex.putLong(offset); // 存入 4 个字节的消息物理大小 this.byteBufferIndex.putInt(size); // 存入 8 个字节的 tagsCode,普通消息是消息 tags 的 hashCode,延时消息是消息投递到延时队列的时间 this.byteBufferIndex.putLong(tagsCode);

大家不要被 this.byteBufferIndex.flip() 给迷惑了,这个方法说是切换读模式,其实在里面就是将 position 重置,然后就可以从下标 0 开始继续写入了,byteBufferIndex 是一个 HeapByteBuffer。

我们知道索引是顺序写入 ConsumeQueue 文件中的,所以接下来要做的就是求出这个索引消息在 ConsumeQueue 中的偏移量。

// 消息在 ConsumeQueue 中的具体偏移量,因为 consumeQueueOffset 其实相当于是索引下标,所以要求出具体的偏移量还得 * 20 final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

consumeQueueOffset 这里要 * 20,是因为传过来的 cqOffset 是下标索引,其实就是当前这条索引是第几条 ConsumeQueue 索引,所以实际求起始偏移量的时候还得 * 20。

获取到偏移量之后,就根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的。这个方法在 【RocketMQ 存储】RocketMQ 如何高效创建 MappedFile 中介绍过了,简单来说就是如果获取不到就提交创建 MappedFile 的请求。

// 根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

获取到这个文件之后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件,同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0,那么说明前面的 ConsumeQueue 索引有可能因为某种原因被删掉了,此时要写入的消息是第一条消息,所以就在这条消息之前把空出来的这部分填充一些条目上去。同时更新最新的刷盘和提交位置以及已处理的消息在 CommitLog 中的最小偏移量。

// 创建好了 MappedFile 后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件 // 同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0 // 说明当前消息是第一个文件的第一条消息,但是却不是从 0 位置开始写入,这种情况下这个文件前面部分就需要填充一些条目了 if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // 更新已处理的消息在 CommitLog 中的最小偏移量 this.minLogicOffset = expectLogicOffset; // 更新刷盘位置和已提交的位置 this.mappedFileQueue.setFlushedWhere(expectLogicOffset); this.mappedFileQueue.setCommittedWhere(expectLogicOffset); // 在 ByteBuffer 的 0-expectLogicOffset 位置填充消息索引条目 this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + mappedFile.getWrotePosition()); }

接着看下面的代码,如果不符合上面的条件,那么就判断下 cqOffset != 0,如果符合这个条件,就说明是顺序写入的,这时候计算出 MappedFile 中写指针 wrotePosition 的位置。

判断如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了如果这里不相等,那么说明 expectLogicOffset > currentLogicOffset,说明计算出来的写入位置在当前写指针 wrotePosition 后面,说明这个逻辑队列有可能出 BUG 了,一般都不会走到这里 // 这里就是顺着文件写入,所以需要判断下这个索引条目是否已经被构建过了 if (cqOffset != 0) { // 计算当前 MappedFile 的写指针所在的位置 long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); // 如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了 if (expectLogicOffset < currentLogicOffset) { log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); return true; } // 如果这里不相等,那么说明这个队列顺序有可能是错的 // 注意这里是没有返回 false 的,就是说消息还是可以继续写入 if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset ); } }

上面如果发生 BUG 是没有返回 false 的,而是继续写入,而注意继续写入的位置就是 wrotePosition 而不是 cqOffset,相当于说忽略了 cqOffset。

但是在写入之前,记录下最大写入的 ConsumeQueue 索引的物理偏移量,大家要记住这个文件是顺序写入的。

this.maxPhysicOffset = offset + size; return mappedFile.appendMessage(this.byteBufferIndex.array());

好了,这个方法就解析到这里了,下面就给出所有的代码。

/** * 写入消息到 ConsumeQueue 中 * @param offset 消息在 CommitLog 中的物理偏移量 * @param size 消息大小 * @param tagsCode 消息 tagsCode,普通消息是 hashCode,延时消息是消息投递到延时队列的时间 * @param cqOffset 消息在 ConsumeQueue 中的偏移量 * @return */ private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { // 如果消息偏移量 + 消息大小 <= 已处理的消息的最大偏移量 if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); // 说明消息已经处理过了,不需要再处理了 return true; } // 切换读模式,byteBufferIndex 是临时存储的 ByteBuffer,可以被反复利用,这里切换读模式就相当于将 position 重置了,然后就可以重新写入 this.byteBufferIndex.flip(); // 限制 20B this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); // 存入 8 个字节的消息物理偏移量(CommitLog 偏移量) this.byteBufferIndex.putLong(offset); // 存入 4 个字节的消息物理大小 this.byteBufferIndex.putInt(size); // 存入 8 个字节的 tagsCode,普通消息是消息 tags 的 hashCode,延时消息是消息投递到延时队列的时间 this.byteBufferIndex.putLong(tagsCode); // 消息在 ConsumeQueue 中的具体偏移量,因为 consumeQueueOffset 其实相当于是索引下标,所以要求出具体的偏移量还得 * 20 final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; // 根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { // 创建好了 MappedFile 后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件 // 同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0 // 说明当前消息是第一个文件的第一条消息,但是却不是从 0 位置开始写入,这种情况下这个文件前面部分就需要填充一些条目了 if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // 更新已处理的消息在 CommitLog 中的最小偏移量 this.minLogicOffset = expectLogicOffset; // 更新刷盘位置和已提交的位置 this.mappedFileQueue.setFlushedWhere(expectLogicOffset); this.mappedFileQueue.setCommittedWhere(expectLogicOffset); // 在 ByteBuffer 的 0-expectLogicOffset 位置填充消息索引条目 this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + mappedFile.getWrotePosition()); } // 这里就是顺着文件写入,所以需要判断下这个索引条目是否已经被构建过了 if (cqOffset != 0) { // 计算当前 MappedFile 的写指针所在的位置 long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); // 如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了 if (expectLogicOffset < currentLogicOffset) { log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); return true; } // 如果这里不相等,那么说明这个队列顺序有可能是错的 if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset ); } } this.maxPhysicOffset = offset + size; return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; } 4.4 fillPreBlank 填充消息索引

这个方法就是上面说的对 MappedFile 前面空出的部分进行填充,这个方法的逻辑就是填充 wrotePosition - untilWhere 这部分的空白。

private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) { ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); // 消息物理偏移量 byteBuffer.putLong(0L); // 消息长度 byteBuffer.putInt(Integer.MAX_VALUE); // 消息 tagsCode byteBuffer.putLong(0L); int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize()); for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) { mappedFile.appendMessage(byteBuffer.array()); } }

方法的逻辑不复杂,直接看代码就行了,要注意的是填入的索引条目:

物理偏移量:0消息长度:Integer.MAX_VALUE消息 tagsCode:0 4.5 appendMessage 追加消息到 MappedByteBuffer 中

这个方法在【RocketMQ 存储】- RocketMQ存储类 MappedFile 中已经解析过了,所以这里我就不再讲解,直接给出代码。

/** * 追加消息到 MappedByteBuffer 中,这里是 ConsumeQueue 的调用 * @param data 要写入的数据 * @return */ public boolean appendMessage(final byte[] data) { // 获取写指针的位置 int currentPos = this.wrotePosition.get(); // 判断写入这些数据之后会不会超出这个 MappedByteBuffer 的边界 if ((currentPos + data.length) <= this.fileSize) { try { // slice 获取这个 MappedByteBuffer 的视图 ByteBuffer buf = this.mappedByteBuffer.slice(); // 设置写入位置 buf.position(currentPos); // 写入数据到 MappedByteBuffer buf.put(data); } catch (Throwable e) { log.error("Error occurred when append message to mappedFile.", e); } // 重新设置写指针的位置 this.wrotePosition.addAndGet(data.length); return true; } return false; } 5. 小结

好了,到这里我们就讲解了 CommitLogDispatcherBuildConsumeQueue 是如何构建 ConsumeQueue 索引的,下一篇文章我们将会讲解 IndexFile 索引的构建。

如有错误,欢迎指出!!!

标签:

【RocketMQ存储】CommitLogDispatcherBuildConsumeQueue构建Consum由讯客互联电脑硬件栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【RocketMQ存储】CommitLogDispatcherBuildConsumeQueue构建Consum