zookeeper【封神录】下篇
- 手机
- 2025-08-06 17:12:02

目录
🥞1.客户端API
🌭2.服务器动态上下线
🧂3.分布式锁
1.客户端API 1.1导入依赖 <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.14.1</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> </dependencies> 1.2代码实现 public class zkClient { //一定不要有空格 private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181"; private int sessionTimeOut = 2000; private ZooKeeper zkClient; /** * 初始话zookeeper * 参数1:连接地址 * 参数2:超时时间 * 参数3:监听器 */ @Before public void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { List<String> children = null; try { children = zkClient.getChildren("/", true); } catch (Exception e) { e.printStackTrace(); } System.out.println("========================"); for (String child : children) { System.out.println(child); } System.out.println("========================"); } }); } /** * 创建子节点 * 参数1:创建节点的路径 * 参数2:节点的数据(转化为字节) * 参数3:节点的权限 * 参数4:节点的类型(临时/永久) **/ @Test public void create() throws InterruptedException, KeeperException { String nodeCreate = zkClient.create("/class", "s1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * 监控子节点变化 * 参数1:要监控的节点目录 * 参数2:监听器(true:使用初始化是的监听器) */ @Test public void getChildren() throws InterruptedException, KeeperException { List<String> children = zkClient.getChildren("/", true); for (String child : children) { System.out.println(child); } //延时 Thread.sleep(Long.MAX_VALUE); } /** * 判断节点是否存在 * 参数1:判断的节点路径 * 参数:是否使用监听器 */ @Test public void isExist() throws InterruptedException, KeeperException { Stat stat = zkClient.exists("/class", false); System.out.println(stat == null ? "不存在" : "存在"); } } 1.3写数据原理 1.写入请求直接发送给Leader 1.客户端发请求给Leader2.leader执行请求并应答,然后把请求分发给下一个follower3.follower会执行请求并应答。4.当应答数超过半数,Leader就会回复客户端,完成了写请求5.leader会继续发送写请求给剩下的follower 2.写入请求发送给Follower 1.客户端发请求给follower,follower没有写权限,立即把写请求发给leader2.leader执行写请求并应答,然后把写请求分发给follower3.follower会执行请求并应答。4.当应答数超过半数,Leader回复follower,由follower回复客户端,完成了写请求5.leader会继续发送写请求给剩下的follower 2.服务器动态上下线 2.1客户端 1.获取zookeeper连接2.监听节点的变化3.业务逻辑(睡眠) public class DisClient { private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181"; private int sessionTimeOut = 2000; private ZooKeeper zooKeeper; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DisClient client = new DisClient(); //1.获取zk连接 client.getConnect(); //2.监听/servers下面的节点变化 client.getServerList(); //3.业务逻辑 client.business(); } private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } /** * 监听服务端(获取节点信息) * @throws InterruptedException * @throws KeeperException */ private void getServerList() throws InterruptedException, KeeperException { List<String> children = zooKeeper.getChildren("/servers", true); //服务器地址存放到集合中 ArrayList<String> list = new ArrayList<>(); for (String child : children) { byte[] data = zooKeeper.getData("/servers/" + child, false, null); list.add(new String(data)); } System.out.println(list); } /** * 初始话zookeeper * @throws IOException */ private void getConnect() throws IOException { zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { getServerList(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }); } } 2.2服务端 1.获取zookeeper连接2.创建节点(服务端注册到zookeeper)3.业务逻辑(睡眠) /** * 服务端注册zookeeper */ public class DisServer { private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181"; private int sessionTimeOut = 2000; private ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { DisServer dIsServer = new DisServer(); //1.获取zk连接 dIsServer.getConnect(); //2.注册服务器到zk节点(创建节点) dIsServer.register(args[0]); //3.启动业务逻辑 dIsServer.business(); } private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } /** * 注册服务器(创建节点) * @param hostname * @throws InterruptedException * @throws KeeperException */ private void register(String hostname) throws InterruptedException, KeeperException { String create = zooKeeper.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + "已经上线"); } /** * 初始化zookeeper * @throws IOException */ private void getConnect() throws IOException { zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } } 3.分布式锁
Zookeeper的分布式锁实现基于其znode(zk节点)功能。每个znode都可以有数据和子节点,并且每个znode都有一个版本号。Zookeeper的分布式锁利用了znode的版本号特性,同时使用watcher机制实现分布式锁的互斥
3.1执行流程 当一个客户端需要获取锁时,它会在Zookeeper上创建一个临时且有序的znode节点。客户端通过获取Zookeeper上的znode列表,并判断自己创建的节点是否是所有节点中最小的那个,如果是,则表示客户端获得了锁。如果客户端没有获得锁,则监听它前面(比它序号小的)的节点,等待锁的释放。当客户端释放锁时,它会删除自己创建的znode节点,此时,Zookeeper会通知正在等待前面的节点上的watcher机制,让等待锁的客户端尝试重新获取锁 3.2代码实现 /** * 分布式锁 */ public class ZkLock { private final String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181"; private final int sessionTimeOut = 2000; private final ZooKeeper zooKeeper; private String path; private CountDownLatch countDownLatch = new CountDownLatch(1); private CountDownLatch countPathLatch = new CountDownLatch(1); private String currentNode; //构造器初始化 public ZkLock() throws IOException, InterruptedException, KeeperException { //1.获取链接 zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //countDownLatch 连接上zookeeper,释放 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } //countPathLatch 释放 if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(path)) { countPathLatch.countDown(); } } }); //等待zookeeper正常连接后,往下执行程序 countDownLatch.await(); //2.判断根节点locks是否存在 Stat stat = zooKeeper.exists("/locks", false); if (stat == null) { //说明不存在,创建根节点 zooKeeper.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } //3.加锁 public void zkLock() { //创建对应的临时带序号的节点 try { currentNode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //判断创建的节点是否是最小的序号节点,如果是,获取到锁,如果不是,监听他前一个结点 List<String> children = zooKeeper.getChildren("/locks", false); //如果只有一个值,直接获取锁,如果不是,则判断 if (children.size() == 1) { //直接枷锁 return; } else { //对节点进行排序 Collections.sort(children); //获取节点名称 String thisNode = currentNode.substring("/locks/".length()); //通过界节点名称,获取在集合中的下标 int index = children.indexOf(thisNode); //判断下标 if (index == -1) { System.out.println("数据异常"); } else if (index == 0) {//第一个数据 //直接枷锁 return; } else {//说明多个节点,进行监听前一个节点 path = "/locks/" + children.get(index - 1); zooKeeper.getData(path, true, new Stat()); //等待监听 countPathLatch.await(); return; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } //4.解锁 public void unZkLock() { //删除节点 try { zooKeeper.delete(currentNode, -1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } } 3.3线程测试 public class ZkLockTest { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZkLock zkLock1 = new ZkLock(); ZkLock zkLock2 = new ZkLock(); ZkLock zkLock3 = new ZkLock(); new Thread(new Runnable() { @Override public void run() { try { zkLock1.zkLock(); System.out.println("线程1,获取到锁"); Thread.sleep(3 * 1000); zkLock1.unZkLock(); System.out.println("线程1,释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { zkLock2.zkLock(); System.out.println("线程2,获取到锁"); Thread.sleep(3 * 1000); zkLock2.unZkLock(); System.out.println("线程2,释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { zkLock3.zkLock(); System.out.println("线程3,获取到锁"); Thread.sleep(3 * 1000); zkLock3.unZkLock(); System.out.println("线程3,释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } 4.Curator框架Curator是Apache ZooKeeper的一个高级客户端库,旨在使开发人员更容易编写可靠的分布式系统。它为ZooKeeper提供了许多有用的功能,包括连接管理,分布式锁和选举,缓存和观察。Curator还提供了一组易于使用的API,可以轻松管理ZooKeeper的节点和数据。
4.1添加依赖 <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.5.0</version> </dependency> 4.2代码实现 /** * 客户端连接 * @return */ private static CuratorFramework getCuratorFramework() { //创建zookeeper的客户端:重试策略,初始化每次重试之间需要等待的时间,基准等待时间为3秒 ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3); CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.20.129:2181,192.168.20.131:2181,192.168.20.130:2181") .connectionTimeoutMs(2000) .sessionTimeoutMs(2000) .retryPolicy(policy).build(); client.start(); System.out.println("zookeeper启动~"); return client; } 4.3创建线程测试 InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks"); InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks"); new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("线程一获取到锁"); Thread.sleep(3000); lock1.release(); System.out.println("线程一释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("线程二获取到锁"); Thread.sleep(3000); lock2.release(); System.out.println("线程二释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); }zookeeper【封神录】下篇由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“zookeeper【封神录】下篇”
下一篇
滴水逆向1