主页 > IT业界  > 

RocketMq中RouteInfoManger组件的源码分析

RocketMq中RouteInfoManger组件的源码分析
1.前言

RouteInfoManager 是 RocketMQ 中 NameServer 的核心组件之一,主要负责管理和维护整个 RocketMQ 集群的路由元数据信息。里面包含一些非常核心的功能:存储和管理 Broker 信息(broker的注册,broker心跳的维护);维护 Topic 的路由信息(topic的创建和更新,topic路由信息的查询);管理队列信息,管理集群信息等。

2.内部数据结构 public class RouteInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); // broker长连接过期时间 长连接的空闲时间是2分钟 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; //读写锁 private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 创建topic 以后 topic是逻辑上的概念 一个topic会有多个Queue Queue会分散到不同的broker上 private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable; // 代表的broker组的信息 BrokerData包含了一组Broker的信息 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // 一个NameServer可以管理多个broker组 通常来说一个Cluster就可以了 // 有可能会有很多复杂的业务场景 多个Cluster private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //管理Broker的长连接心跳 是否还有心跳 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // Filter Server 是rocketMQ的一个高级功能,用来过滤消息 //一般情况下 我们是可以基于tag进行数据筛选的操作,比较简单,没有办法进行更加细化的过滤 //这个Filter Server是在每台Broker机器上启动一个(或者多个)Filter Server //我们可以把一个自定义的消息筛选的class 上传到Filter server上,在进行数据消费的时候让Broker把数据先传输到Filter Server // Filter Server会根据你自定义的class来进行细粒度的数据筛选,把筛选后的数据回传给你的消费端 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; } 3.核心方法     3.1 getAllClusterInfo /** * 返回的是 broker的cluster信息 * 里面包含的是HashMap<String //brokerName// BrokerData> brokerAddrTable * HashMap<String //clusterName// , Set<String //brokerName// >> clusterAddrTable * @return */ public ClusterInfo getAllClusterInfo() { ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo(); clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable); clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable); return clusterInfoSerializeWrapper; } 3.2 deleteTopic /** * 删除某个topic 直接操作topicQueueTable的hashMap * @param topic */ public void deleteTopic(final String topic) { try { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.remove(topic); } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("deleteTopic Exception", e); } } public void deleteTopic(final String topic, final String clusterName) { try { try { this.lock.writeLock().lockInterruptibly(); Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (brokerNames != null && !brokerNames.isEmpty()) { Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic); if (queueDataMap != null) { for (String brokerName : brokerNames) { final QueueData removedQD = queueDataMap.remove(brokerName); if (removedQD != null) { log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, topic, removedQD); } } if (queueDataMap.isEmpty()) { log.info("deleteTopic, remove the topic all queue {} {}", clusterName, topic); this.topicQueueTable.remove(topic); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("deleteTopic Exception", e); } } 3.3 getAllTopicList /** * 查询所有的topic的列表信息 * @return */ public TopicList getAllTopicList() { TopicList topicList = new TopicList(); try { try { this.lock.readLock().lockInterruptibly(); topicList.getTopicList().addAll(this.topicQueueTable.keySet()); } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("getAllTopicList Exception", e); } return topicList; } 3.4  registerBroker

   详细的注册流程 可以看我以前的博客:RocketMQ中的NameServer主要数据结构-CSDN博客

/** * broker的注册方法 * @param clusterName broker的集群名称 * @param brokerAddr broker的地址 * @param brokerName broker所属组的名称 * @param brokerId broker机器的id * @param haServerAddr broker的ha地址 * @param topicConfigWrapper 当前broker机器上包含的topic队列上的数据 * @param filterServerList broker上部署的filterServer的列表 * @param channel netty的网络长连接 * @return broker注册的结果 */ public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { //省略大部分代码 } 3.5 unregisterBroker /** * broker的下线逻辑处理 * @param clusterName 集群名 * @param brokerAddr 地址 * @param brokerName broker组的名字 * @param brokerId broker对应的id */ public void unregisterBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId) { try { try { //加锁 this.lock.writeLock().lockInterruptibly(); //获取brokerLiveInfo对象 获取保活信息 BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); log.info("unregisterBroker, remove from brokerLiveTable {}, {}", brokerLiveInfo != null ? "OK" : "Failed", brokerAddr ); //filterServerTable中删除broker的信息 this.filterServerTable.remove(brokerAddr); boolean removeBrokerName = false; //获取broker组中获取到brokerData信息 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { //根据brokerId 从brokerData中移除掉BrokerId对应的地址 String addr = brokerData.getBrokerAddrs().remove(brokerId); log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", addr != null ? "OK" : "Failed", brokerAddr ); //broker组中的机器数量如果为空的话 就移除掉这个broker组的信息 if (brokerData.getBrokerAddrs().isEmpty()) { this.brokerAddrTable.remove(brokerName); log.info("unregisterBroker, remove name from brokerAddrTable OK, {}", brokerName ); removeBrokerName = true; } } //如果已经移除掉Broker组的信息的话 if (removeBrokerName) { //从集群中移除掉这个broker组 Set<String> nameSet = this.clusterAddrTable.get(clusterName); if (nameSet != null) { boolean removed = nameSet.remove(brokerName); log.info("unregisterBroker, remove name from clusterAddrTable {}, {}", removed ? "OK" : "Failed", brokerName); //集群中的broker组的数量如果也为空的话 就移除掉这个集群的信息 if (nameSet.isEmpty()) { this.clusterAddrTable.remove(clusterName); log.info("unregisterBroker, remove cluster from clusterAddrTable {}", clusterName ); } } //根据broker的名字移除掉topic的信息 this.removeTopicByBrokerName(brokerName); } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("unregisterBroker Exception", e); } } /** * 根据broker的名字移除掉topic的信息 * @param brokerName */ private void removeTopicByBrokerName(final String brokerName) { Set<String> noBrokerRegisterTopic = new HashSet<>(); this.topicQueueTable.forEach((topic, queueDataMap) -> { QueueData old = queueDataMap.remove(brokerName); if (old != null) { log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, old); } if (queueDataMap.size() == 0) { noBrokerRegisterTopic.add(topic); log.info("removeTopicByBrokerName, remove the topic all queue {}", topic); } }); noBrokerRegisterTopic.forEach(topicQueueTable::remove); } //获取topic的路由信息(broker的地址信息,以及在broker上的filterServer的列表) 针对一个topic里有多个queues来进行路由 public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set<String> brokerNameSet = new HashSet<>(); List<BrokerData> brokerDataList = new LinkedList<>(); topicRouteData.setBrokerDatas(brokerDataList); HashMap<String, List<String>> filterServerMap = new HashMap<>(); topicRouteData.setFilterServerTable(filterServerMap); try { try { //加一把读锁 this.lock.readLock().lockInterruptibly(); //从topicQueueTable中获取到topic对应的 QueueData Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic); if (queueDataMap != null) { topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values())); foundQueueData = true; //从queueData中获取到broker名字的set集合 brokerNameSet.addAll(queueDataMap.keySet()); for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; // skip if filter server table is empty if (!filterServerTable.isEmpty()) { for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); // only add filter server list when not null if (filterServerList != null) { filterServerMap.put(brokerAddr, filterServerList); } } } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; } 3.6 scanNotActiveBroker

  扫描出心跳超时的broker,并针对超时的broker进行下线的操作

public int scanNotActiveBroker() { // 这块的方法主要是brokerLiveTable的集合中的所有元素 //拿到broker最新一次的心跳时间 //broker的最新一次心跳时间+120s 小于 当前时间戳 //就把这个broker进行移除掉 int removeCount = 0; Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { //关闭连接的channel通道信息 RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); //从内存中进行删除缓存的channel连接信息 this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); removeCount++; } } return removeCount; } //从brokerLiveTable中删除掉broker的保活信息并进行清理掉内存中的保活信息 public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; //找到要进行删除的broker信息 if (channel != null) { try { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); } //下面的代码开始进行删除broker的信息 if (brokerAddrFound != null && brokerAddrFound.length() > 0) { try { try { this.lock.writeLock().lockInterruptibly(); //删除 brokerLiveTable中的broker信息 this.brokerLiveTable.remove(brokerAddrFound); //删除 filterServerTable中的broker信息 this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false; //删除broker组中的broker信息 Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } //如果删除broker完成之后 发现broker组的信息也为空 那就把broker组进行删除操作 if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } //删除cluster集群的中的broker组信息 if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); } break; } } } //删除topic组在这个删除broker组中对应的信息也进行删除的操作 if (removeBrokerName) { String finalBrokerNameFound = brokerNameFound; Set<String> needRemoveTopic = new HashSet<>(); topicQueueTable.forEach((topic, queueDataMap) -> { QueueData old = queueDataMap.remove(finalBrokerNameFound); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, old); if (queueDataMap.size() == 0) { log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); needRemoveTopic.add(topic); } }); needRemoveTopic.forEach(topicQueueTable::remove); } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } }

标签:

RocketMq中RouteInfoManger组件的源码分析由讯客互联IT业界栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“RocketMq中RouteInfoManger组件的源码分析