Zookeeper分布式锁实现
- 人工智能
- 2025-09-06 09:51:03

zookeeper最初设计的初衷就是为了保证分布式系统的一致性。本文将讲解如何利用zookeeper的临时顺序结点,实现分布式锁。
目录
1. 理论分析
1.1 结点类型
1.2 监听器
1.3 实现原理
2. 手写实现简易zookeeper分布式锁
1.1 依赖
1.2 常量定义
1.3 实现zookeeper分布式锁
1.4 使用方式
3. 引入Curator框架实现zookeeper分布式锁
2.1 框架依赖
2.2 使用方式
1. 理论分析
zookeeper 和Linux一样,采用目录树的方式管理结点,目录层级间以 / 区分
每个数据节点在 ZooKeeper 中被称为 znode,它是 ZooKeeper 中数据的最小单元。由于ZooKeeper 主要用于协调服务,出于性能和一致性考虑,每个节点的存放数据上限为1M
1.1 结点类型
znode有四种类型:
1.持久化结点 (PERSISTENT): 创建节点后一直存在
2. 持久化有序结点(PERSISTENT_SEQUENTIAL):在持久化结点的基础上,zookeeper会自动根据创建顺序,在结点名称后面加上一串序号
3. 临时结点(EPHEMERAL):在zookeeper与客户端失去连接后自动删除
4. 临时有序结点(EPHEMERAL_SEQUENTIAL):在临时结点的基础上,zookeeper会自动根据创建顺序,在结点名称后面加上一串序号
1.2 监听器Watcher 监听机制是 Zookeeper 中非常重要的特性。结点可以绑定监听事件,当监听事件发生的时候,Zookeeper会向客户端发送通知事件,执行监听器的回调方法。
1.3 实现原理
我们首先新建一个"/locks"的持久化结点,用来管理表示锁的子节点。(实际场景使用可以根据不同锁对象划分成更细致的持久化结点,比如"/locks/bilibili/comment/publish")
当用户尝试获取锁的时候,在"locks"结点下新建一个临时有序结点,例如"seq-00001"
新建结点成功后,系统进行检查,建立的结点是否是当前所有子节点中序号最小的一个
如果是最小的一个,说明用户是当前锁的持有者,往下执行业务逻辑,执行完成后摧毁临时结点
如果不是最小的一个,为了避免不断地自旋检查空耗性能,一般采用注册监听器的方式减少性能消耗:监听前一个结点的摧毁事件。如果用户持有的结点前面还有其他结点,说明用户不是持有的人,不能执行业务逻辑,应当阻塞等待;直到用户前一个结点被摧毁,说明轮到用户持有锁了,可以继续往下执行业务逻辑。
2. 手写实现简易zookeeper分布式锁 1.1 依赖 <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> <scope>test</scope> </dependency> <!--日志--> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!--zookeeper--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.6</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> 1.2 常量定义 public interface ZkConstants { //连接地址 String connectString = "127.0.0.1:2181"; // 连接超时时间 int sessionTimeout = 2000; } 1.3 实现zookeeper分布式锁 public class DistributedLock { // zk客户端连接 private ZooKeeper zkClient; // 连接成功等待 private CountDownLatch connectLatch = new CountDownLatch(1); // 前一个结点(锁) private String waitPath; // 结点删除等待 private CountDownLatch waitLatch = new CountDownLatch(1); // 当前创建的结点(锁) private String createNode; /** * 构造方法:初始化客户端连接 * * @throws IOException * @throws InterruptedException * @throws KeeperException */ public DistributedLock() throws IOException, InterruptedException, KeeperException { //获取连接 zkClient = new ZooKeeper(ZkConstants.connectString, ZkConstants.sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //连接成功,释放countDownLatch if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); } //前一个结点删除 if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) { //解锁下一个结点 waitLatch.countDown(); } } }); //等待zk正常连接后,再往下执行 connectLatch.await(); //判断根节点/locks是否存在 Stat exists = zkClient.exists("/locks", false); if (exists == null) { //创建根节点 -- 持久结点 zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } /** * 加锁 * * @throws InterruptedException * @throws KeeperException */ public void zkLock() throws InterruptedException, KeeperException { //创建对应的临时带序号结点 createNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //判断创建节点是否是序号最小的结点 List<String> children = zkClient.getChildren("/locks", false); if (children.size() == 1) { return; } else { //排序结点以得到当前创建结点的序号(等待锁的序位) Collections.sort(children); //获取生成的临时结点序号 String thisNode = createNode.substring("/locks/".length()); //获得排序 int index = children.indexOf(thisNode); if (index == -1) { System.out.println("数据异常"); } else if (index == 0) { //最小序号结点,直接获取锁 return; } else { //监听序号前一个结点 waitPath = "/locks/" + children.get(index - 1); //true代表使用创建zkClient时初始化的监听器 zkClient.getData(waitPath, true, null); waitLatch.await(); } } } /** * 解锁 * * @throws InterruptedException * @throws KeeperException */ public void zkUnLock() throws InterruptedException, KeeperException { //删除临时带序号结点 zkClient.delete(createNode, -1); } } 1.4 使用方式 public class DistributedLockTest { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ExecutorService executorService = Executors.newFixedThreadPool(2); DistributedLock lock1 = new DistributedLock(); DistributedLock lock2 = new DistributedLock(); //多线程获取锁1 CompletableFuture.supplyAsync(() -> { try { lock1.zkLock(); System.out.println("线程" + Thread.currentThread().getName() + "获取到锁......"); Thread.sleep(5000); lock1.zkUnLock(); System.out.println("线程" + Thread.currentThread().getName() + "释放锁......"); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (KeeperException e) { throw new RuntimeException(e); } return true; }, executorService); //多线程获取锁2 CompletableFuture.supplyAsync(() -> { try { lock2.zkLock(); System.out.println("线程" + Thread.currentThread().getName() + "获取到锁......"); Thread.sleep(5000); lock2.zkUnLock(); System.out.println("线程" + Thread.currentThread().getName() + "释放锁......"); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (KeeperException e) { throw new RuntimeException(e); } return true; }, executorService); executorService.shutdown(); } } 3. 引入Curator框架实现zookeeper分布式锁
实际生产环境下,自然不可能手写这么多代码处理分布式锁,且不提很多地方的代码可复用,CountDownLatch反复处理带来的代码复杂性高,并且一些可重入锁、异常处理等逻辑上文也并没有完善。
生产场景中被广泛使用的zookeeper分布式锁的框架便是Curator
2.1 框架依赖 /.. 省略 ../ <!--Curator--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency> 2.2 使用方式 public class CuratorLockTest { public static void main(String[] args) { //创建分布式锁1 InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), "/locks"); InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), "/locks"); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(() -> { try { lock1.acquire(); System.out.println("线程1获取到锁"); //curator支持可重入锁 lock1.acquire(); System.out.println("线程1 再次获取到锁"); Thread.sleep(5000); lock1.release(); System.out.println("线程1 释放锁"); lock1.release(); System.out.println("线程1 再次释放锁"); } catch (Exception e) { throw new RuntimeException(e); } }); executorService.execute(() -> { try { lock2.acquire(); System.out.println("线程2获取到锁"); lock2.acquire(); System.out.println("线程2 再次获取到锁"); Thread.sleep(5000); lock2.release(); System.out.println("线程2 释放锁"); lock2.release(); System.out.println("线程2 再次释放锁"); } catch (Exception e) { throw new RuntimeException(e); } } ); executorService.shutdown(); } public static CuratorFramework getCuratorFramework() { //4秒超时,重试3次 ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder().connectString(ZkConstants.connectString) .connectionTimeoutMs(ZkConstants.sessionTimeout) .sessionTimeoutMs(ZkConstants.sessionTimeout) .retryPolicy(exponentialBackoffRetry) .build(); client.start(); System.out.println("zookeeper 启动成功..."); return client; } }希望能对大家理解zookeeper分布式锁有所帮助
Zookeeper分布式锁实现由讯客互联人工智能栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Zookeeper分布式锁实现”
上一篇
存储引擎---数据库