zookeeper常用操作
Easul Lv6

相关视频

黑马的zookeeper

基础概念

  • ZookeeperApache Hadoop的子项目,是一个树形目录服务。
  • Zookeeper是一个分布式的,开源的分布式应用程序协调服务(可用于管理分布式应用程序)
  • zookeeper主要提供的功能
    • 配置管理
      1
      2
      3
      4
      如多个机器的应用程序配置文件中配置了同一个数据库
      若数据库地址变了,就需要每个应用程序都去改配置信息,很麻烦
      所以可以设置配置中心,服务连接到配置中心,获取公有配置信息
      配置信息变了,在配置中心改即可,服务仍旧可以用,不需要修改
    • 分布式锁
      1
      2
      多个机器访问同一个数据,就需要在多个机器之间有一个锁,就是分布式锁
      访问数据之前,机器先获取分布式锁才能继续访问,未获取分布式锁的机器则等待
    • 集群管理
      1
      2
      也就是可以作为注册中心使用,Provider给Registry服务的信息
      Consumer需要访问Provider的服务,则从Registry获取服务的信息,再进行RPC访问

安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
cd ~/software
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
tar -zxvf zookeeper-3.4.6.tar.gz
rm -rf zookeeper-3.4.6.tar.gz
cp ~/software/zookeeper-3.4.6/conf/zoo_sample.cfg ~/software/zookeeper-3.4.6/conf/zoo.cfg
vi ~/software/zookeeper-3.4.6/conf/zoo.cfg

# 数据的存储路径配置
# dataDir=/tmp/zookeeper
dataDir=/home/easul/software/hadoop-data/zk_data

# 添加环境变量
echo "
ZOOKEEPER_HOME=/home/easul/software/zookeeper-3.4.6
PATH=\$ZOOKEEPER_HOME/bin:\$PATH
export PATH
" >> ~/.bash_profile
source ~/.bash_profile
# 启动zookeeper
zkServer.sh start
# zkServer.sh stop: 停止zookeeper
# zkServer.sh status: 查看zookeeper状态

zookeeper的bin中脚本解释

1
2
zkCli: zookeeper客户端脚本,可用于改造zookeeper中的数据,
zkServer: zookeeper服务端脚本,用于zookeeper的启动,停止等操作

集群管理配置管理不需要写代码,可用zkCli.sh维护,分布式锁可能需要写代码

命令操作

数据模型

image

  • Zookeeper是一个树形目录服务,数据模型和Unix的文件系统目录树类似,有层次化结构
  • 每个结点是ZNode,每个结点都会保存自己的数据和结点信息
  • 结点可以有子结点,允许少量(1MB)数据存储在该结点
  • 结点的四大类
    • PERSISTENT:持久化结点,zookeeper关了,数据也还在
    • EPHEMERAL:临时结点,-e,zookeeper关了,数据就没了,只在当前会话有效
    • PERSISTENT_SEQUENTIAL:持久化顺序结点,-s,创建的结点有编号
    • EPHEMERAL_SEQUENTIAL:临时顺序结点,-es,创建的结点有编号

服务端常用命令

1
2
3
4
5
6
7
8
# zookeeper启动
zkServer.sh start
# zookeeper查看状态
zkServer.sh status
# zookeeper停止
zkServer.sh stop
# zookeeper重启
zkServer.sh restart

客户端常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 连接zookeeper服务器
# -server 指定zookeeper的服务器信息:ip+端口
zkCli.sh -server localhost:2181
# ========================================
# 连接到客户端后的命令
# 1. 退出
quit
# 2. 查看
# 2.1 查看根结点下有什么结点
ls /
# 2.2 查看其他结点下有什么结点,必须要从/,即根结点开始,后边加上所有层级
ls /zookeeper
# 2.3 查看某结点的详细信息,会显示所有子结点,以及结点的一些参数
ls -s /zookeeper
# 3. 创建,默认创建的都是持久化的结点
# 3.1 创建结点到某个目录,不赋值数据
create /app1
# 3.2 创建结点到某个目录,并赋值给结点数据
create /app1 easul_test
# 3.3 不能创建重复的结点,但可以创建子结点
create /app1/app11 child_test
# 3.4 加上参数创建临时结点,会话关掉之后(如命令行关了之后),结点消失
# -e 指定该结点为临时结点
create -e /app1/app12 child_test2
# 3.5 创建顺序结点,默认是持久化的结点
# 创建的结点后边带编号
create -s /app1/app13 child_test3
# 3.6 创建临时的顺序结点
create -es /app1/app14 child_test4
# 4. 获取数据
get /app1
# 5. 设置数据
set /app1 new_data
# 6. 删除结点,如果还有子结点则不能删除
delete /app1
# 7. 删除结点,如果还有子结点则一起删除
deleteall /app1
# 8. 帮助命令
help

结点详细信息的解释

1
2
3
4
5
6
7
8
9
10
11
czxid: 结点被创建的事务ID
ctime: 创建时间
mzxid: 最后一次被更新的事务ID
mtime: 修改时间
pzxid: 子结点列表最后一次被更新的事务ID
cversion: 子结点的版本号
dataversion: 数据版本号
aclversion: 权限版本号
ephemeralOwner: 用于临时结点,代表临时结点的事务ID,如果为持久结点,则为0
dataLength: 结点存储的数据的长度
numChildren: 当前结点的子结点个数

JavaAPI操作

Curator常识

  • Apache zookeeper的java客户端库
  • 常见zookeeper的Java API
    • 原生Java API(不好用)
    • ZkClient(原生Java API的简化封装)
    • Curator(最好用,简化了zookeeper客户端的使用)
  • 由Netfix研发,捐给了Apache,现为Apache顶级项目
  • 官网可以查看curator相关信息
    *
    • zookeeper3.5.xzookeeper3.4.x都用curator4.0以上

Curator常用操作

准备

创建maven的java项目

1
2
# 命令行或者其他方式创建java项目
mvn archetype:generate -DgroupId=ml.lightly -DartifactId=demo -DarchetypeArtifactId=maven-archetype-quickstart -Dversion=0.0.1-SNAPSHOT -DinteractiveMode=false

pom.xml依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!-- curator依赖 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
<!-- zookeeper3.4.x需要加这个依赖的排除,不过加了可能也报错。3.5.x不需要加 -->
<!-- <exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions> -->
</dependency>
<!-- 日志依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-alpha5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.0-alpha5</version>
<scope>test</scope>
</dependency>

日志配置文件log4j.properties,放到resources

1
2
3
4
5
log4j.rootLogger = off,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n

CuratorTest.java,放到test目录

1
2
3
4
5
package ml.lightly.curator;

public class CuratorTest {

}

建立连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;

public class CuratorTest {
@Test
public void testConnect() {
// 创建重试策略,可通过右键RetryPolicy接口查看其实现类
// baseSleepTimeMs 重试间隔
// maxRetries 最大重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
// 第一种创建方式,使用newClient
// CuratorFrameworkFactory的newClient是创建新客户端
// curator这个java客户端和zookeeper建立连接的类
// connectString zookeeper服务端的地址:ip+port。集群有多个地址,就用,隔开
// sessionTimeoutMs 会话超时,curator和zookeeper多长时间没联系就超时
// connectionTimeoutMs 连接超时,curator和zookeeper多长时间没连上就超时
// retryPolicy 重试策略,
// CuratorFramework client = CuratorFrameworkFactory.newClient("master:2181,salve1:2181,slave2:2181", 60 * 1000,
// 15 * 1000, retryPolicy);

// 第二种创建方式,使用builder()创建
// CuratorFrameworkFactory的builder()是用链式方式来创建
// 可以指定名称空间,也就是默认的根目录
// 以后所有的操作都会认为根目录是这个,用于zookeeper中结点的隔离
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("master:2181,salve1:2181,slave2:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("lightly")
.build();
// 开启连接
client.start();
}
}

创建结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorTest {
private CuratorFramework client;

// 加了Before的方法,会在所有的Test方法前执行
// 从而保证client被赋值
@Before
public void testConnect() {
// 创建重试策略,可通过右键RetryPolicy接口查看其实现类
// baseSleepTimeMs 重试间隔
// maxRetries 最大重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
// 第一种创建方式,使用newClient
// CuratorFrameworkFactory的newClient是创建新客户端
// curator这个java客户端和zookeeper建立连接的类
// connectString zookeeper服务端的地址:ip+port。集群有多个地址,就用,隔开
// sessionTimeoutMs 会话超时,curator和zookeeper多长时间没联系就超时
// connectionTimeoutMs 连接超时,curator和zookeeper多长时间没连上就超时
// retryPolicy 重试策略,
// CuratorFramework client = CuratorFrameworkFactory.newClient("master:2181,salve1:2181,slave2:2181", 60 * 1000,
// 15 * 1000, retryPolicy);

// 第二种创建方式,使用builder()创建
// CuratorFrameworkFactory的builder()是用链式方式来创建
// 可以指定名称空间,也就是默认的根目录
// 以后所有的操作都会认为根目录是这个,用于zookeeper中结点的隔离
client = CuratorFrameworkFactory.builder()
.connectString("master:2181,slave1:2181,slave2:2181")
.sessionTimeoutMs(10 * 1000)
.connectionTimeoutMs(5 * 1000)
.retryPolicy(retryPolicy)
.namespace("lightly")
.build();
// 开启连接
client.start();
}

@Test
public void testCreate() throws Exception {
// 1. 创建一个空结点,返回结点路径,这里会创建到testConnect的命令空间里
// 创建的结点不指定数据,默认放的是客户端的IP
String path = client.create().forPath("/xxx");
System.out.println(path);

// 2. 创建带数据的结点,返回结点路径,这里会创建到testConnect的命令空间里
String path1 = client.create().forPath("/xxx1", "asdf".getBytes());
System.out.println(path1);

// 3. 设置节点的类型,类型通过CreateMode的枚举指定
// 这里设置的是临时结点,Java API会话结束后结点就被删了
String path2 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/xxx2", "asdf".getBytes());
System.out.println(path2);

// 4. 创建多级结点
// 直接创建多级结点会报错,需要加上creatingParentContainersIfNeeded(),需要创建父节点就会自动创建
String path3 = client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath("/xxx3", "asdf".getBytes());
System.out.println(path3);
}

// 加了After的方法会在Test方法都执行后执行
// 这里可用于释放资源
@After
public void testClose() {
if (client != null) {
client.close();
}
}
}

查询结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testGet() throws Exception {
// 1. 查询数据
byte[] data = client.getData().forPath("/xxx1");
// 因为byte[]也相当于是一个对象,直接输出就相当于输出 类型@hash值
// 所以可以将byte[]数组转为字符串
System.out.println(new String(data));

// 2. 查询子结点
// 如果查命名空间下的子结点,需要写/,不能写空字符串
List<String> children = client.getChildren().forPath("/");
System.out.println(children);

// 3. 查询结点状态信息
// 结点信息放到这个stat,也就是状态的对象里
Stat stat = new Stat();
// 获取数据后设置存储到哪里,然后指定获取谁的信息
client.getData().storingStatIn(stat).forPath("/xxx1");
System.out.println(stat);
}

修改结点

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testSet() throws Exception {
// 1. 设置数据,然后指定修改哪个结点和指定修改为什么
client.setData().forPath("/xxx1", "asdffda".getBytes());

// 2. 根据版本修改,防止多人同时操作一个数据,从而出现问题。(保证原子操作)
// 先查出结点版本,然后根据该版本号进行设置
// 设置数据的时候如果其他客户端已经修改了数据,版本号会变,则这里会设置失败,抛出错误
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/xxx1");
int version = stat.getVersion();
client.setData().withVersion(version).forPath("/xxx1", "new".getBytes());
}

删除结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test 
public void testDelete() throws Exception {
// 1. 删除单个结点
client.delete().forPath("/xxx");
// 2. 删除带有子结点的结点
client.delete().deletingChildrenIfNeeded().forPath("/");
// 3. 必须删除成功
// 例如网络不好,客户端的请求,服务端没有收到,所以需要再次请求
client.delete().guaranteed().forPath("/");
// 4. 删除后执行回调,会自动执行回调函数
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
// 有两个参数,一个是连接对象client,另一个是事件对象event
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("成功");
System.out.println(event);
}
}).forPath("/xxx1");
}

Watch事件监听

  • zookeeper允许客户端在指定结点注册一些watcher(监听器),服务端监听结点,一些特殊事件触发后,如结点数据更改,服务端就可以通知注册监听器的客户端
  • zookeeper的配置中心注册中心分布式锁都会依赖这个Watcher事件监听
  • zookeeper的Watcher事件监听可以实现发布,订阅功能。多个订阅者监听某一个结点,结点数据变化后,zookeeper服务端会通知所有订阅者
  • Curator引入了Cache来实现对zookeeper服务端事件的监听,Cache可以注册事件的监听也可以缓存一些信息。
  • Curator中zookeeper的三种watcher
    • NodeCache:只监听某一个特定结点
    • PathChildrenCache:监控一个结点的子结点,不包括孙子或重孙结点
    • TreeCache:监听整棵树的所有结点,即该结点与其所有子结点,也包括孙子或重孙结点

CuratorWatcherTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package ml.lightly.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorWatcherTest {
private CuratorFramework client;

// 加了Before的方法,会在所有的Test方法前执行
// 从而保证client被赋值
@Before
public void testConnect() {
// 创建重试策略,可通过右键RetryPolicy接口查看其实现类
// baseSleepTimeMs 重试间隔
// maxRetries 最大重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
// 第一种创建方式,使用newClient
// CuratorFrameworkFactory的newClient是创建新客户端
// curator这个java客户端和zookeeper建立连接的类
// connectString zookeeper服务端的地址:ip+port。集群有多个地址,就用,隔开
// sessionTimeoutMs 会话超时,curator和zookeeper多长时间没联系就超时
// connectionTimeoutMs 连接超时,curator和zookeeper多长时间没连上就超时
// retryPolicy 重试策略,
// CuratorFramework client = CuratorFrameworkFactory.newClient("master:2181,salve1:2181,slave2:2181", 60 * 1000,
// 15 * 1000, retryPolicy);

// 第二种创建方式,使用builder()创建
// CuratorFrameworkFactory的builder()是用链式方式来创建
// 可以指定名称空间,也就是默认的根目录
// 以后所有的操作都会认为根目录是这个,用于zookeeper中结点的隔离
client = CuratorFrameworkFactory.builder()
.connectString("master:2181,slave1:2181,slave2:2181")
.sessionTimeoutMs(10 * 1000)
.connectionTimeoutMs(5 * 1000)
.retryPolicy(retryPolicy)
.namespace("lightly")
.build();
// 开启连接
client.start();
}


// 加了After的方法会在Test方法都执行后执行
// 这里可用于释放资源
@After
public void testClose() {
if (client != null) {
client.close();
}
}

@Test
public void testNodeCache() throws Exception {
// 1. 创建NodeCache对象
// client 与zookeeper的连接
// path 监听的结点路径
// dataIsCompressed 设置数据是否压缩,默认不压缩。压缩后传输更快,但接收后需要解压缩。
// 下边的内部类需要使用这个数据,所以加final,防止数据变化
final NodeCache nodeCache = new NodeCache(client, "/xxx1", false);

// 2. 注册监听
// 获取监听器对象,并传入被zookeeper通知后要实现的方法
// lambda表达式如下
// nodeCache.getListenable().addListener(() -> System.out.println("监听器被通知后的操作"));
nodeCache.getListenable().addListener(new NodeCacheListener() {

@Override
public void nodeChanged() throws Exception {
// 结点变化包括数据变化,结点被删除,结点被删除后结点又被创建,创建结点等
System.out.println("结点变化后,监听器被通知后的操作");

// 获取结点修改后的数据
// 还可以获取路径getPath(),获取状态getStat()
byte[] newData = nodeCache.getCurrentData().getData();
System.out.println(new String(newData));
}

});

// 3. 开启监听
// 设置为true,则开启监听时,加载缓冲数据
// nodeCache.start(true);
nodeCache.start(true);

// 防止在单元测试时代码直接执行完就退出了,从而无法获取到服务端的通知
while(true) {

}

}

@Test
public void testPathChildrenCache() throws Exception {
// 1. 创建PathChildrenCache对象
// client 与zookeeper的连接
// path 监听的结点路径
// cacheData 设置是否缓存数据到额外的Stat对象中
// dataIsCompressed 设置数据是否压缩,默认不压缩。压缩后传输更快,但接收后需要解压缩。
// executorService 内部提供的线程池。可以用默认的,也可以用自己的
// 下边的内部类需要使用这个数据,所以加final,防止数据变化
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/", true);

// 2. 注册监听
// 获取监听器对象,并传入被zookeeper通知后要实现的方法
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {

@Override
// 一个是连接对象client,另一个是事件对象event
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// 子结点变化会有影响,孙子或重孙结点变化没有影响,自己这个结点变化也没有影响
// 结点变化包括数据变化,结点被删除,结点被删除后结点又被创建,创建结点等
System.out.println("子结点变化后的操作");
// 连接后会有默认的重连事件,不需要进行处理
System.out.println(event);
// 根据子结点的操作类型进行处理,数据更新就输出数据。
// 因为是PathChildrenCacheEvent的内部类,就不再导入包了
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}

});

// 3. 开启监听
pathChildrenCache.start();

// 防止在单元测试时代码直接执行完就退出了,从而无法获取到服务端的通知
while(true) {

}
}

@Test
public void testTreeCache() throws Exception {
// 1. 创建TreeCache对象
// client 与zookeeper的连接
// path 监听的结点路径
// cacheData 设置是否缓存数据到额外的Stat对象中
// dataIsCompressed 设置数据是否压缩,默认不压缩。压缩后传输更快,但接收后需要解压缩。
// executorService 内部提供的线程池。可以用默认的,也可以用自己的
// 下边的内部类需要使用这个数据,所以加final,防止数据变化
final TreeCache treeCache = new TreeCache(client, "/");

// 2. 注册监听
// 获取监听器对象,并传入被zookeeper通知后要实现的方法
treeCache.getListenable().addListener(new TreeCacheListener() {

@Override
// 一个是连接对象client,另一个是事件对象event
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
// 自己和子结点和孙子结点等的变化都有会有影响
// 结点变化包括数据变化,结点被删除,结点被删除后结点又被创建,创建结点等
System.out.println("自己或子结点变化后的操作");
// 连接后会有默认的重连事件,不需要进行处理
System.out.println(event);
}


});

// 3. 开启监听
treeCache.start();

// 防止在单元测试时代码直接执行完就退出了,从而无法获取到服务端的通知
while(true) {

}
}
}

分布式锁

  • 情形:单机可以用syncronizedLock,因为在一个JVM,没有问题,但是多机器多个JVM,多线程锁就无法解决该问题了。所以用分布式锁,多个机器用同一个锁,从而解决多机器的多个线程同时访问一个数据的问题。
  • 用途:用于解决跨机器的进程之间数据同步问题
  • 分布式锁的实现
    • 基于缓存
      • Redis(性能高,但可能不可靠,有了集群后,master结点没有给slave结点同步数据就挂了,就可能多个人会获取锁)
      • memcached
    • 基于zookeeper(性能高,可靠)
      • Curator
    • 基于数据库实现,性能低
      • 悲观锁
      • 乐观锁
  • zookeeper分布式锁原理
    • 核心思想:当客户端要获取锁,就创建结点,使用完锁,就删除结点
      1. 客户端获取锁,就在某结点(如lock)下创建临时顺序结点(临时结点,会话断了就释放了,不会因为机器挂了很长时间占用资源。顺序可以保证序号小的先获取锁)
      2. 然后获取该结点下所有子结点,如果自己创建的结点序号最小,则认为获取到锁。使用完锁,删除刚刚创建的结点
      3. 如果发现自己创建的结点序号不是最小,则表示没获取到锁。然后找比自己小一个的结点,注册事件监听器,监听删除事件(3找2,2找1)
      4. 如果发现比自己小的结点被删除,客户端的Watcher会收到通知。此时再次判断自己创建的结点是不是所有子结点中最小的。是则获取锁,否则找比自己小一个的结点,注册事件监听器,监听删除事件

分布式锁的实现

Curator的五种锁

  • InterProcessSemaphoreMutex:分布式排他锁(非可重入锁)
    • 获取了锁,进入了资源。再想进入该资源,需先释放锁,再重新竞争锁
  • InterProcessMutex:分布式可重入排他锁
    • 获取了锁,进入了资源。再想进入该资源,可以直接进入。比较麻烦,重入几次就需要释放几次
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
    • 多个锁当做一个锁,需要都获得才行
  • InterProcessSemaphoreV2:共享信号量

12306的模拟售票。
LockTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
package ml.lightly.curator;

public class LockTest {
public static void main(String[] args) {
// 创建任务
Ticket12306 ticket12306 = new Ticket12306();
// 创建线程用于买票
Thread t1 = new Thread(ticket12306, "携程");
Thread t2 = new Thread(ticket12306, "飞猪");
t1.start();
t2.start();
}
}

Ticket12306.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package ml.lightly.curator;

import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Ticket12306 implements Runnable{
private int ticketCount = 10;
private InterProcessMutex lock;

public Ticket12306() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("master:2181,slave1:2181,slave2:2181")
.sessionTimeoutMs(10 * 1000)
.connectionTimeoutMs(5 * 1000)
.retryPolicy(retryPolicy)
.namespace("lightly")
.build();
// 开启连接
client.start();
lock = new InterProcessMutex(client, "/lock");
}

@Override
public void run() {
// 一直提供服务,所以使用使用while true
while (true) {
// 买票前获取锁
// time 获取锁时的等待时间
// unit 时间的单位
try {
lock.acquire(3, TimeUnit.SECONDS);
if (ticketCount > 0) {
System.out.println(Thread.currentThread() + "买了一张票");
Thread.sleep(100);
System.out.println("剩余" + (--ticketCount) + "张票");
} else {
break;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 买完票释放锁
try {
System.out.println(Thread.currentThread() + ":" + ticketCount);
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

}

zookeeper集群

Leader选举

  • Serverid:服务器的ID。给服务器的编号越大,选择算法中的权重越大
  • Zxid:数据ID。值越大,数据越新。在选举算法中数据越新,权重越大
  • Leader选举中,某台zookeeper获得超过半数选票,该zookeeper就是leader
    • 有1-5五台机器,按顺序启动,1先启动,只有自己,则选自己,不够半数,无法成为leader
    • 2启动,1和2都选2,不够半数无法成为leader
    • 3启动,1,2和3都选3,够半数,leader产生
    • 4,5启动也不再进行leader的重新选举了,3还是leader

集群搭建

真实集群:不同ip是不同的zookeeper
伪集群:同一台机器搭建多个zookeeper,用端口区分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 这是在三个机器下搭建
# ip分别定义在master,slave1,slave2
mkdir ~/software && cd ~/software
# 安装jdk
wget https://download.java.net/openjdk/jdk8u41/ri/openjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz
tar -zxvf openjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz
rm -rf openjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz
# 在.bashrc或.bash_profile(centos7有,deepin可能没有)配置
echo "
export JAVA_HOME=/home/easul/software/java-se-8u41-ri
PATH=\$JAVA_HOME/bin:\$PATH
export PATH
# 配置jar包路径
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
" >> ~/.bash_profile

# 重新加载配置文件
source ~/.bash_profile

# 安装zookeeper
# 先在master结点
cd ~/software
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.5.6/zookeeper-3.5.6.tar.gz
tar -zxvf zookeeper-3.5.6.tar.gz
rm -rf zookeeper-3.5.6.tar.gz
cp ~/software/zookeeper-3.5.6/conf/zoo_sample.cfg ~/software/zookeeper-3.5.6/conf/zoo.cfg
vi ~/software/zookeeper-3.5.6/conf/zoo.cfg

# 设置zookeeper基本时间度量单位。客户端与服务器的心跳时间为1*tickTime,客户端会话超时时间为2*tickTime
# tickTime = 2000
# 数据的存储路径配置
# dataDir=/tmp/zookeeper
dataDir=/home/easul/software/hadoop-data/zk_data
# clientPort可用于更改zookeeper的客户端端口,实现伪集群,需要用不同端口
# clientPort=218x
# zookeeper服务器的配置,1, 2, 3分别对应的是master,slave1,slave2的myid
# 配置的解释为:server.服务器ID=服务器IP地址:集群内的机器通讯使用的端口:选举leader使用的端口
# 为client提供的端口为2181,集群默认通讯端口为2881,选举leader默认端口为3881
# 搭建伪集群,这里的集群通讯端口和选举leader端口也都需要改成不同的
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

# 创建zookeeper的数据文件夹
mkdir -p ~/software/hadoop-data/zk_data
# 创建myid,当作这个集群结点的唯一标识
echo "1" > ~/software/hadoop-data/zk_data/myid
# 配置master的环境变量
echo "
ZOOKEEPER_HOME=/home/easul/software/zookeeper-3.5.6
PATH=\$ZOOKEEPER_HOME/bin:\$PATH
export PATH
" >> ~/.bash_profile
source ~/.bash_profile
# 分发环境变量,zookeeper和数据文件夹到slave结点
scp ~/.bash_profile easul@slave1:~/
scp ~/.bash_profile easul@slave2:~/
scp -r ~/software/zookeeper-* easul@slave1:~/software
scp -r ~/software/zookeeper-* easul@slave2:~/software
scp -r ~/software/hadoop-data/zk_data easul@slave1:~/software/hadoop-data
scp -r ~/software/hadoop-data/zk_data easul@slave2:~/software/hadoop-data
# 配置slave结点的myid
echo "2" > ~/software/hadoop-data/zk_data/myid
echo "3" > ~/software/hadoop-data/zk_data/myid
# 刷新slave的配置文件
source ~/.bash_profile
# 在每一个结点启动zookeeper
zkServer.sh start
# zkServer.sh stop: 停止zookeeper
# zkServer.sh status: 查看zookeeper是leader还是follower
# jps有了QuorumPeerMain即启动成功
jps

故障模拟

1
2
3
4
5
6
# 用1,2,3三台机器顺序开启,2成为leader
# 关闭3这个follower,2还是leader
# 关闭1这个follower,1就不是leader,因为没有超过半数,zookeeper集群不再运行
# 再开启1,2成为了leader,集群又开始运行
# 再启动3号,2号还是leader
# 关闭2号leader,3号成为新的leader,zookeeper集群还是可以运行

集群角色

集群中所有zookeeper数据是一样的,只是角色不一样

  • Leader
    • 处理事务请求(增删改是事务请求,查询是非事务请求)
    • 是集群内部各个服务器的调度者
  • Follower
    • 处理客户端非事务请求,转发事务请求给Leader服务器
    • 参与leader的选举投票
  • Observer
    • 处理客户端非事务请求,转发事务请求给Leader服务器
    • 不参与leader的选举投票,主要用于分担Follower的非事务请求压力

如果client连接到Follower,请求删除结点,Follower就转发请求到Leader。Leader处理后,结点变少,就会给所有其他服务器同步数据,保证数据一致
如果client连接到Follower,请求查询,Follower就可以直接操作,返回请求

 评论