什么是 zookeeper

ZooKeeper 是一个分布式的,开放源码的分布式应用程序协同服务。ZooKeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

很多分布式协调服务都可以用 ZooKeeper 来做,其中典型应用场景如下:

  • 配置管理(configuration management):如果我们做普通的 Java 应用,一般配置项就是一个本地的配置文件,如果是微服务系统,各个独立服务都要使用集中化的配置管理,这个时候就需要 ZooKeeper。
  • DNS 服务。
  • 组成员管理(group membership):比如上面讲到的 HBase 其实就是用来做集群的组成员管理。
  • 各种分布式锁。

ZooKeeper 适用于存储和协同相关的关键数据,不适合用于大数据量存储。如果要存 KV 或者大量的业务数据,还是要用数据库或者其他 NoSql 来做。主要有以下两个原因:

  • 设计方面:ZooKeeper 需要把所有的数据(它的 data tree)加载到内存中。这就决定了 ZooKeeper 存储的数据量受内存的限制。这一点 ZooKeeper 和 Redis 比较像。一般的数据库系统例如 MySQL(使用 InnoDB 存储引擎的话)可以存储大于内存的数据,这是因为 InnoDB 是基于 B-Tree 的存储引擎。B-tree 存储引擎和 LSM 存储引擎都可以存储大于内存的数据量。
  • 工程方面:ZooKeeper 的设计目标是为协同服务提供数据存储,数据的高可用性和性能是最重要的系统指标,处理大数量不是 ZooKeeper 的首要目标。因此,ZooKeeper 不会对大数量存储做太多工程上的优化。

zookeeper 数据模型

ZooKeeper 的数据模型是层次模型。层次模型常见于文件系统。层次模型和 key-value 模型是两种主流的数据模型。ZooKeeper 使用文件系统模型主要基于以下两点考虑:

  • 文件系统的树形结构便于表达数据之间的层次关系。
  • 文件系统的树形结构便于为不同的应用分配独立的命名空间(namespace)。

ZooKeeper 的层次模型称作 data tree。Data tree 的每个节点叫做 znode。不同于文件系统,每个节点都可以保存数据。每个节点都有一个版本(version),版本从 0 开始计数。

znode

一个 znode 可以是持久性的,也可以是临时性的,znode 节点也可以是顺序性的。每一个顺序性的 znode 关联一个唯一的单调递增整数,因此 ZooKeeper 主要有以下 4 种 znode:

  • 持久性的 znode (PERSISTENT): ZooKeeper 宕机,或者 client 宕机,这个 znode 一旦创建就不会丢失。
  • 临时性的 znode (EPHEMERAL): ZooKeeper 宕机了,或者 client 在指定的 timeout 时间内没有连接 server,都会被认为丢失。
  • 持久顺序性的 znode (PERSISTENT_SEQUENTIAL): znode 除了具备持久性 znode 的特点之外,znode 的名字具备顺序性。
  • 临时顺序性的 znode (EPHEMERAL_SEQUENTIAL): znode 除了具备临时性 znode 的特点之外,znode 的名字具备顺序性。

在 Java 中操作 Zookeeper

依赖导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>

Zookeeper 原生 API 建立连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void zookeeperConnection() throws Exception {
try {
final CountDownLatch countDownLatch=new CountDownLatch(1);
ZooKeeper zooKeeper=
new ZooKeeper("127.0.0.1:2181,",
4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(Event.KeeperState.SyncConnected==event.getState()){
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
}
}
});
countDownLatch.await();
//CONNECTED
System.out.println(zooKeeper.getState());
// 建立新节点添加数据
zooKeeper.create("/nyatest","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (Exception e) {
throw new Exception(e);
}
}

运行结果

QQ20250123-132039.png

Curator 建立连接并查询之前添加的数据

1
2
3
4
5
6
7
8
9
10
11
12
public void curatorConnection() throws Exception {
CuratorFramework curatorFramework= CuratorFrameworkFactory.
builder().connectString("127.0.0.1:2181").
sessionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000,3)).
namespace("").build();
curatorFramework.start();
Stat stat=new Stat();
//查询节点数据
byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/nyatest");
System.out.println(new String(bytes));
curatorFramework.close();
}

运行结果

QQ20250123-132323.png

Curator 创建节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void curatorCreate() throws Exception {
// 没有指定数据默认将ip作为数据存储
String path = client.create().forPath("/app1");
System.out.println(path);

// 指定数据,默认持久型
path = client.create().forPath("/app2", "nya".getBytes(StandardCharsets.UTF_8));
System.out.println(path);

// 设置节点类型
path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path);

// 创建多级节点
path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(path);
}

运行结果

QQ20250123-141106.png

QQ20250123-141137.png

Curator 查询节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void curatorGet() throws Exception {
// get 命令
byte[] data = client.getData().forPath("/app2");
System.out.println(new String(data));

// 查询子节点 ls 命令
List<String> paths = client.getChildren().forPath("/");
System.out.println(paths);

// 查询状态信息 ls -s 命令
Stat stat = new Stat();
data = client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(new String(data));
System.out.println(stat.toString());
}

运行结果

QQ20250123-142350.png

Curator 修改数据

1
2
3
4
5
6
7
8
9
public void curatorSet() throws Exception {
client.setData().forPath("/", "hello".getBytes(StandardCharsets.UTF_8));

// 根据版本号修改
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
int version = stat.getVersion();
client.setData().withVersion(version).forPath("/app1", "2025".getBytes(StandardCharsets.UTF_8));
}

Curator 删除操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void curatorDelete() throws Exception {
// delete 命令
client.delete().forPath("/app1");
// 删除多级节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
// 确保删除成功
client.delete().guaranteed().forPath("/app2");
// 删除回调
client.delete().inBackground(new BackgroundCallback() {

@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("已删除");
}
}).forPath("/");
}

通过 Curator 实现服务注册和服务发现

注册中心可以对服务上下线做统一管理。每个工作服务器都可以作为数据的发布方向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息,当工作服务器的基本信息发生改变如上下线、服务器角色或服务范围变更,监控服务器可以得到通知并响应这些变化。服务自动注册与发现后,不再需要写死服务提供方地址,注册中心基于接口名查询服务提供者的 IP 地址,并且能够平滑添加或删除服务提供者。

Service Discovery

我们通常在调用服务的时候,需要知道服务的地址,端口,或者其他一些信息,通常情况下,我们是把他们写到程序里面,但是随着服务越来越多,维护起来也越来越费劲,更重要的是,由于地址都是在程序中配置的,我们根本不知道远程的服务是否可用,当我们增加或者删除服务,我们又需要到配置文件中配置么? 这时候,Zookeeper 帮大忙了,我们可以把我们的服务注册到 Zookeeper 中,创建一个临时节点(当连接断开之后,节点将被删除),存放我们的服务信息(url,ip,port 等信息),把这些临时节点都存放在以 serviceName 命名的节点下面,这样我们要获取某个服务的地址,只需要到 Zookeeper 中找到这个 path,然后就可以读取到里面存放的服务信息,这时候我们就可以根据这些信息调用我们的服务。这样,通过 Zookeeper 我们就做到了动态的添加和删除服务,做到了一旦一个服务时效,就会自动从 Zookeeper 中移除。

Curator Service Discovery 就是为了解决这个问题而生的,它对此抽象出了 ServiceInstance、ServiceProvider、ServiceDiscovery 三个接口,通过它我们可以很轻易的实现 Service Discovery。

ServiceInstance

Curator 中使用 ServiceInstance 作为一个服务实例,ServiceInstances 具有名称,ID,地址,端口和/或 ssl 端口以及可选的 payload(用户定义)。ServiceInstances 以下列方式序列化并存储在 ZooKeeper 中。

ServiceProvider

ServiceProvider 是主要的抽象类。它封装了发现服务为特定的命名服务和提供者策略。提供者策略方案是从一组给定的服务实例选择一个实例。有三个捆绑策略:轮询调度、随机和粘性(总是选择相同的一个)。ServiceProviders 是使用 ServiceProviderBuilder 分配的。消费者可以从从 ServiceDiscovery 获取 ServiceProviderBuilder(参见下文)。ServiceProviderBuilder 允许您设置服务名称和其他几个可选值。必须通过调用 start()来启动 ServiceProvider 。完成后,您应该调用 close()。