Java中使用Zookeeper进行分布式协调服务

2025-04发布7次浏览

Java中使用Zookeeper进行分布式协调服务

一、引言

Zookeeper是一个开源的分布式协调服务,它为分布式应用提供了诸如配置维护、域名服务、分布式同步、组服务等通用功能。在Java开发中,通过Zookeeper可以实现分布式系统中的节点管理、锁机制、选举等功能。

二、Zookeeper的基本概念

  1. Znode:Zookeeper的数据模型是树形结构,每个节点称为一个Znode。
  2. Watcher:客户端可以在某个Znode上注册一个Watcher,当这个Znode发生变化时,Zookeeper会通知客户端。
  3. Session:客户端与Zookeeper服务器之间的会话。会话有一个超时时间,如果在这个时间内没有收到心跳包,会话将失效。
  4. ACL:访问控制列表,用于控制对Znode的访问权限。

三、Zookeeper的典型应用场景

  1. 分布式锁:通过创建临时顺序节点来实现分布式锁。
  2. 配置管理:将系统的配置信息存储在Zookeeper中,所有客户端都可以读取和监听配置的变化。
  3. 命名服务:类似于DNS,提供分布式系统中的名字到地址的映射。
  4. 分布式队列:通过创建临时顺序节点实现分布式队列。

四、实践步骤

下面我们将通过一个简单的Java程序来演示如何使用Zookeeper实现分布式锁。

1. 环境准备
  • 安装Zookeeper并启动服务。
  • 添加Zookeeper的依赖到项目中。如果你使用Maven,可以在pom.xml中添加以下依赖:
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
</dependency>
2. 编写代码

以下是一个简单的分布式锁实现:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock implements Watcher {
    private ZooKeeper zk;
    private String lockPath = "/lock";
    private String clientId;
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public DistributedLock(String host, String clientId) throws IOException, InterruptedException {
        this.clientId = clientId;
        zk = new ZooKeeper(host, 5000, this);
        connectedSemaphore.await();
    }

    @Override
    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }
    }

    public void acquireLock() throws KeeperException, InterruptedException {
        Stat stat = zk.exists(lockPath, false);
        if (stat == null) {
            System.out.println(clientId + " is trying to acquire the lock...");
            zk.create(lockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println(clientId + " has acquired the lock.");
        } else {
            System.out.println(clientId + " is waiting for the lock...");
            String waitPath = zk.create("/wait", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            List<String> children = zk.getChildren("/wait", true);
            Collections.sort(children);
            if (!children.get(0).equals(waitPath.substring(1))) {
                waitForLock(waitPath, children.get(0));
            }
            System.out.println(clientId + " has acquired the lock.");
        }
    }

    public void releaseLock() throws KeeperException, InterruptedException {
        System.out.println(clientId + " is releasing the lock...");
        zk.delete(lockPath, 0);
        zk.close();
        System.out.println(clientId + " has released the lock.");
    }

    private void waitForLock(String currentWaitPath, String previousWaitPath) throws KeeperException, InterruptedException {
        Stat stat = zk.exists("/" + previousWaitPath, false);
        while (stat != null) {
            Thread.sleep(100);
            stat = zk.exists("/" + previousWaitPath, false);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributedLock lock1 = new DistributedLock("localhost:2181", "Client1");
        DistributedLock lock2 = new DistributedLock("localhost:2181", "Client2");

        new Thread(() -> {
            try {
                lock1.acquireLock();
                Thread.sleep(5000); // 模拟业务处理
                lock1.releaseLock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                lock2.acquireLock();
                Thread.sleep(5000); // 模拟业务处理
                lock2.releaseLock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

五、总结

通过上述代码,我们实现了基于Zookeeper的分布式锁。Zookeeper的强大之处在于其简单易用的API和高可靠性,使得开发者能够专注于业务逻辑而无需过多关注底层的分布式协调细节。