《Zookeeper分布式过程协同技术详解》读书笔记-2
- 其他
- 2025-09-04 00:57:01

目录 zk的一些内部原理和应用请求,事务和标识读写操作事务标识(zxid) 群首选举Zab协议(ZooKeeper Atomic Broadcast protocol)文件系统和监听通知机制分布式配置中心, 简单Demojava code 集群管理code 分布式锁 zk的一些内部原理和应用 请求,事务和标识 读写操作 只读请求(exists, getData,getChildren)会在zk服务其本地处理,然后返回给客户端(所以zk处理以只读请求为主要负载时,性能会很高,可以增加更多的服务器到zk集群,这样能处理更多的读请求)
因为ZooKeeper集群中所有的server节点都拥有相同的数据,所以读的时候可以在任意一台server节点上,客户端连接到集群中某一节点,读请求,然后直接返回。当然因为ZooKeeper协议的原因(一半以上的server节点都成功写入了数据,这次写请求便算是成功),读数据的时候可能会读到数据不是最新的server节点,所以比较推荐使用watch机制,在数据改变时,及时感知到
写操作(create, delete, setData) 将会被转发给群首(群首会执行相应请求,并形成状态更新,我们称为事务(Transaction)) Client向Zookeeper的server1发送一个写请求,客户端写数据到服务器1上;如果server1不是Leader,那么server1会把接收到的写请求转发给Leader;然后Leader会将写请求转发给每个server; server1和server2负责写数据,并且两个Follower的写入数据是一致的,保存相同的数据副本;server1和server2写数据成功后,通知Leader; 当Leader收到集群半数以上的节点写成功的消息后,说明该写操作执行成功; 这里是3台服务器,只要2台Follower服务器写成功就ok因为client访问的是server1,所以Leader会告知server1集群中数据写成功; 被访问的server1进一步通知client数据写成功,这时,客户端就知道整个写操作成功了 事务标识(zxid)当群首产生了一个事务,就会为该事务分配一个标识符,我们称之 为ZooKeeper会话ID(zxid),通过Zxid对事务进行标识,就可以按照群 首所指定的顺序在各个服务器中按序执行
zxid为一个long型(64位)整数,分为两部分:时间戳(epoch)部 分和计数器(counter)部分。每个部分为32位,在我们讨论zab(Zookeeper Atomic Broadcast )协议 时,我们就会发现时间戳(epoch)和计数器(counter)的具体作用, 我们通过该协议来广播各个服务器的状态变更信息。
eg:
setData 加上事务(版本和新数据值),一个事务为一个单位,以原子方式执行;ZooKeeper集群以事务方式运行,并确保所 有的变更操作以原子方式被执行,同时不会被其他事务所干扰;并不存在传统的关系数据库中所涉及的回滚机制,而是 确保事务的每一步操作都互不干扰
同时一个事务还具有幂等性,也就是说,我们可以对同一个事务执 行两次,我们得到的结果还是一样的,我们甚至还可以对多个事务执行 多次,同样也会得到一样的结果,前提是我们确保多个事务的执行顺序 每次都是一样的。事务的幂等性可以让我们在进行恢复处理时更加简单。
实际上,ZooKeeper的每个节点维护者两个Zxid值,为别为:cZxid、mZxid。
(1)cZxid: 是节点的创建时间所对应的Zxid格式时间戳。
(2)mZxid:是节点的修改时间所对应的Zxid格式时间戳。
高32位是epoch用来标识Leader关系是否改变,每次一个Leader被选出来,它都会有一个新的epoch。低32位是个递增计数。
群首选举群首为集群中的服务器选择出来的一个服务器,并会一直被集群所 认可。设置群首的目的是为了对客户端所发起的ZooKeeper状态变更请 求进行排序,包括:create、setData和delete操作。群首将每一个请求转 换为一个事务,将这些事务发送给追随者,确保集 群按照群首确定的顺序接受并处理这些事务。
每个服务器启动后进入LOOKING状态,开始选举一个新的群首或 查找已经存在的群首,如果群首已经存在,其他服务器就会通知这个新 启动的服务器,告知哪个服务器是群首,与此同时,新的服务器会与群 首建立连接,以确保自己的状态与群首一致。
如果集群中所有的服务器均处于LOOKING状态,这些服务器之间 就会进行通信来选举一个群首,通过信息交换对群首选举达成共识的选 择。在本次选举过程中胜出的服务器将进入LEADING状态,而集群中 其他服务器将会进入FOLLOWING状态。
当一个服务器进入LOOKING状态,就会发送向集群中每个服 务器发送一个通知消息,如下
投票<服务器标识,最近执行的事务的zxid信息> (vote<sid, zxid>)快速群首选举的快速指的是什么?
Zab协议(ZooKeeper Atomic Broadcast protocol)在接收到一个写请求操作后,追随者会将请求转发给群首,群首将探索性地执行该请求,并将执行结果以事务的方式对状态更新进行广播。一个事务中包含服务器需要执行变更的确切操作,当事务提交时, 服务器就会将这些变更反馈到数据树上,其中数据树为ZooKeeper用于 保存状态信息的数据结构(请参考DataTree类)。
之后我们需要面对的问题便是服务器如何确认一个事务是否已经提 交,由此引入了我们所采用的协议:Zab:ZooKeeper原子广播协议 (ZooKeeper Atomic Broadcast protocol)。
propose: leader 广播给 followersaccept: followers收到广播信息给leader发送确认消息commit: leader收到确认仲裁数量后发送消息给follower进行提交操作 具体如下:两个保障(依靠了zxid)
一个被选举的群首确保在提交完所有之前的时间戳内需要提交的 事务,之后才开始广播新的事务。在任何时间点,都不会出现两个被仲裁支持的群首。为了实现第一个需求,群首并不会马上处于活动状态,直到确保仲 裁数量的服务器认可这个群首新的时间戳值。一个时间戳的最初状态必 须包含所有的之前已经提交的事务,或者某些已经被其他服务器接受, 但尚未提交完成的事务。这一点非常重要,在群首进行时间戳e的任何 新的提案前,必须保证自时间戳开始值到时间戳e-1内的所有提案被提 交。如果一个提案消息处于时间戳e’<e,在群首处理时间戳e的第一个提 案消息前没有提交之前的这个提案,那么旧的提案将永远不会被提交。
文件系统和监听通知机制 分布式配置中心, 简单Demo 在zookeeper里增加一个目录节点,并把配置信息存储在里面(作为配置中心存储) 多个客户端能够读取到 服务端配置改变,客户端(所有注册监听的客户端)都能监听到事件 java code package demo; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import java.util.concurrent.CountDownLatch; /** * @Author mubi * @Date 2020/3/27 22:36 */ public class ZooKeeperProSync implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk = null; private static Stat stat = new Stat(); public static void main(String[] args) throws Exception { // zookeeper配置数据的存放路径 String path = "/myconfig"; // 连接zookeeper并且注册一个监听器 zk = new ZooKeeper("127.0.0.1:2281", 5000, new ZooKeeperProSync()); // 等待zk连接成功的通知(等待connectedSemaphore.countDown()减少为0) connectedSemaphore.await(); // 获取path目录节点的配置数据,并注册对节点的监听 System.out.println(new String(zk.getData(path, true, stat))); Thread.sleep(Integer.MAX_VALUE); } @Override public void process(WatchedEvent event) { // zk连接成功通知事件 if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); }else if (event.getType() == EventType.NodeDataChanged) { // zk目录节点数据变化通知事件 try { System.out.println("配置已修改,新值为:" + new String(zk.getData(event.getPath(), true, stat))); // TODO 具体业务 } catch (Exception e) { } } } } } 集群管理集群管理原理:机器的加入/退出,选举leader节点
持久节点 / 临时节点有序节点 / 无序节点可以临时+有序构成server,然后最小编号节点作为leader节点
创建集群节点目录(持久节点) 启动服务,并加入和减少机器观察 code AppMaster package demo; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooKeeper; /** * @Author mubi * @Date 2020/3/27 23:43 */ public class AppMaster { private String clusterNode = "mycluster"; private ZooKeeper zk; private volatile List<String> serverList; public void connectZookeeper() throws Exception { // 注册全局默认watcher zk = new ZooKeeper("127.0.0.1:2281", 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged && ("/" + clusterNode).equals(event.getPath())) { try { updateServerList(); } catch (Exception e) { e.printStackTrace(); } } } }); updateServerList(); } private void updateServerList() throws Exception { List<String> newServerList = new ArrayList<String>(); // watcher注册后,只能监听事件一次,参数true表示继续使用默认watcher监听事件 List<String> subList = zk.getChildren("/" + clusterNode, true); for (String subNode : subList) { // 获取节点数据 byte[] data = zk.getData("/" + clusterNode + "/" + subNode, false, null); newServerList.add(new String(data, "utf-8")); } serverList = newServerList; System.out.println("server list updated: " + serverList); } public static void main(String[] args) throws Exception { AppMaster ac = new AppMaster(); ac.connectZookeeper(); Thread.sleep(Long.MAX_VALUE); } } AppServer package demo; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** * @Author mubi * @Date 2020/3/27 23:44 */ public class AppServer extends Thread { private String clusterNode = "mycluster"; private String serverNode = "server_address"; private String serverName; @Override public void run() { try { connectZookeeper(serverName); } catch (Exception e) { e.printStackTrace(); } } public void connectZookeeper(String address) throws Exception { ZooKeeper zk = new ZooKeeper("127.0.0.1:2281", 5000, new Watcher() { @Override public void process(WatchedEvent event) {} }); // 关键方法,创建包含自增长id名称的目录,这个方法支持了分布式锁的实现 // 四个参数: // 1、目录名称 2、目录文本信息 // 3、文件夹权限,Ids.OPEN_ACL_UNSAFE表示所有权限 // 4、目录类型,CreateMode.EPHEMERAL_SEQUENTIAL表示创建临时目录,session断开连接则目录自动删除 String createdPath = zk.create( "/" + clusterNode + "/" + serverNode, address.getBytes("utf-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("create: " + createdPath); Thread.sleep(Integer.MAX_VALUE); } public AppServer(String serverName) { this.serverName = serverName; } } Server1 package demo; /** * @Author mubi * @Date 2020/3/27 23:46 */ public class Server1 { public static void main(String[] args) throws Exception { AppServer server1 = new AppServer("Server1"); server1.start(); } } 分布式锁一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。
避免羊群效应:所谓羊群效应就是每个节点挂掉,所有节点都去监听,然后做出反应,这样会给服务器带来巨大压力。所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反应。
《Zookeeper分布式过程协同技术详解》读书笔记-2由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“《Zookeeper分布式过程协同技术详解》读书笔记-2”
下一篇
QtMSVC编译器报错C1060