10 基于 Etcd 的分布式锁实现原理及方案

Etcd 最新版本已经提供了支持分布式锁的基础接口(可见官网说明),但本文并不局限于此。

本文将介绍两条实现分布式锁的技术路线:

  1. 从分布式锁的原理出发,结合 Etcd 的特性,洞见分布式锁的实现细节;
  2. 基于 Etcd 提供的分布式锁基础接口进行封装,实现分布式锁。

两条路线差距甚远,建议读者先看路线 1,以便了解 Etcd 实现分布式锁的细节。

1. 为什么选择 Etcd

据官网介绍,Etcd 是一个分布式,可靠的 Key-Value 存储系统,主要用于存储分布式系统中的关键数据。初见之下,Etcd 与 NoSQL 数据库系统有几分相似,但作为数据库绝非 Etcd 所长,其读写性能远不如 MongoDB、Redis 等 Key-Value 存储系统。“让专业的人做专业的事!” Ectd 作为一个高可用的键值存储系统,有很多典型的应用场景,本文将介绍 Etcd 的优秀实践之一:分布式锁。

1.1 Etcd 优点

目前,可实现分布式锁的开源软件有很多,其中应用最广泛、大家最熟悉的应该就是 ZooKeeper,此外还有数据库、Redis、Chubby 等。但若从读写性能、可靠性、可用性、安全性和复杂度等方面综合考量,作为后起之秀的 Etcd 无疑是其中的 “佼佼者” 。它完全媲美业界“名宿”ZooKeeper,在有些方面,Etcd 甚至超越了 ZooKeeper,如 Etcd 采用的 Raft 协议就要比 ZooKeeper 采用的 Zab 协议简单、易理解。

Etcd 作为 CoreOS 开源项目,有以下的特点。

  • 简单:使用 Go 语言编写,部署简单;支持 cURL 方式的用户 API (HTTP+JSON),使用简单;开源 Java 客户端使用简单;
  • 安全:可选 SSL 证书认证;
  • 快速:在保证强一致性的同时,读写性能优秀,详情可查看官方提供的 Benchmark 数据
  • 可靠:采用 Raft 算法实现分布式系统数据的高可用性和强一致性。

1.2 分布式锁的基本原理

分布式环境下,多台机器上多个进程对同一个共享资源(数据、文件等)进行操作,如果不做互斥,就有可能出现“余额扣成负数”,或者“商品超卖”的情况。为了解决这个问题,需要分布式锁服务。首先,来看一下分布式锁应该具备哪些条件。

  • 互斥性:在任意时刻,对于同一个锁,只有一个客户端能持有,从而保证一个共享资源同一时间只能被一个客户端操作;
  • 安全性:即不会形成死锁,当一个客户端在持有锁的期间崩溃而没有主动解锁的情况下,其持有的锁也能够被正确释放,并保证后续其它客户端能加锁;
  • 可用性:当提供锁服务的节点发生宕机等不可恢复性故障时,“热备” 节点能够接替故障的节点继续提供服务,并保证自身持有的数据与故障节点一致。
  • 对称性:对于任意一个锁,其加锁和解锁必须是同一个客户端,即客户端 A 不能把客户端 B 加的锁给解了。

1.3 Etcd 实现分布式锁的基础

Etcd 的高可用性、强一致性不必多说,前面章节中已经阐明,本节主要介绍 Etcd 支持的以下机制:Watch 机制、Lease 机制、Revision 机制和 Prefix 机制,正是这些机制赋予了 Etcd 实现分布式锁的能力。

  • Lease 机制:即租约机制(TTL,Time To Live),Etcd 可以为存储的 Key-Value 对设置租约,当租约到期,Key-Value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,以避免 Key-Value 对过期失效。Lease 机制可以保证分布式锁的安全性,为锁对应的 Key 配置租约,即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放。
  • Revision 机制:每个 Key 带有一个 Revision 号,每进行一次事务便加一,因此它是全局唯一的,如初始值为 0,进行一次 put(key, value),Key 的 Revision 变为 1,同样的操作,再进行一次,Revision 变为 2;换成 key1 进行 put(key1, value) 操作,Revision 将变为 3;这种机制有一个作用:通过 Revision 的大小就可以知道写操作的顺序。在实现分布式锁时,多个客户端同时抢锁,根据 Revision 号大小依次获得锁,可以避免 “羊群效应” (也称“惊群效应”),实现公平锁。
  • Prefix 机制:即前缀机制,也称目录机制,例如,一个名为 /mylock 的锁,两个争抢它的客户端进行写操作,实际写入的 Key 分别为:key1="/mylock/UUID1",key2="/mylock/UUID2",其中,UUID 表示全局唯一的 ID,确保两个 Key 的唯一性。很显然,写操作都会成功,但返回的 Revision 不一样,那么,如何判断谁获得了锁呢?通过前缀“/mylock” 查询,返回包含两个 Key-Value 对的 Key-Value 列表,同时也包含它们的 Revision,通过 Revision 大小,客户端可以判断自己是否获得锁,如果抢锁失败,则等待锁释放(对应的 Key 被删除或者租约过期),然后再判断自己是否可以获得锁。
  • Watch 机制:即监听机制,Watch 机制支持监听某个固定的 Key,也支持监听一个范围(前缀机制),当被监听的 Key 或范围发生变化,客户端将收到通知;在实现分布式锁时,如果抢锁失败,可通过 Prefix 机制返回的 Key-Value 列表获得 Revision 比自己小且相差最小的 Key(称为 Pre-Key),对 Pre-Key 进行监听,因为只有它释放锁,自己才能获得锁,如果监听到 Pre-Key 的 DELETE 事件,则说明 Pre-Key 已经释放,自己已经持有锁。

2. Etcd Java 客户端 jetcd

jetcd 是 Etcd 的 Java 客户端,为 Etcd 的特性提供了丰富的接口,使用起来非常方便。不过,需要注意的是,jetcd 支持 Etcd V3 版本(Etcd 较早的版本是 V2),运行环境需 Java 1.8 及以上。

Git 地址:https://github.com/etcd-io/jetcd

2.1 jetcd 基本用法

首先创建一个 Maven 工程,导入 jetcd 依赖。目前,jetcd 最新的版本为:0.0.2

1
2
3
4
5
6
7
8
9
10
11
<dependency>

<groupId>io.etcd</groupId>

<artifactId>jetcd-core</artifactId>

<version>${jetcd-version}</version>

</dependency>


(1)Key-Value 客户端

Etcd 作为一个 Key-Value 存储系统,Key-Value 客户端是最基本的客户端,进行 Put、Get、Delete 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 创建客户端,本例中Etcd服务端为单机模式

Client client = Client.builder().endpoints("http://localhost:2379").build();

KV kvClient = client.getKVClient();

// 对String类型的Key-Value进行类型转换

ByteSequence key = ByteSequence.fromString("test_key");

ByteSequence value = ByteSequence.fromString("test_value");



// put操作,等待操作完成

kvClient.put(key, value).get();



// get操作,等待操作完成

CompletableFuture<GetResponse> getFuture = kvClient.get(key);

GetResponse response = getFuture.get();



// delete操作,等待操作完成

kvClient.delete(key).get();


(2)Lease 客户端

Lease 客户端,即租约客户端,用于创建租约、续约、解约,以及检索租约的详情,如租约绑定的键值等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 创建客户端,本例中Etcd服务端为单机模式

Client client = Client.builder().endpoints("http://localhost:2379").build();



// 创建Lease客户端

Lease leaseClient = client.getLeaseClient();



// 创建一个60s的租约,等待完成,超时设置阈值30s

Long leaseId = leaseClient.grant(60).get(30, TimeUnit.SECONDS).getID();



// 使指定的租约永久有效,即永久租约

leaseClient.keepAlive(leaseId);



// 续约一次

leaseClient.keepAliveOnce(leaseId);



// 解除租约,绑定该租约的键值将被删除

leaseClient.revoke(leaseId);



// 检索指定ID对应的租约的详细信息

LeaseTimeToLiveResponse lTRes = leaseClient.timeToLive(leaseId, LeaseOption.newBuilder().withAttachedKeys().build()).get();


(3)Watch 客户端

监听客户端,可为 Key 或者目录(前缀机制)创建 Watcher,Watcher 可以监听 Key 的事件(Put、Delete 等),如果事件发生,可以通知客户端,客户端采取某些措施。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 创建客户端,本例中Etcd服务端为单机模式

Client client = Client.builder().endpoints("http://localhost:2379").build();



// 对String类型的Key进行类型转换

ByteSequence key = ByteSequence.fromString("test_key");



// 创建Watch客户端

Watch watchClient = client.getWatchClient();

// 为Key创建一个Watcher

Watcher watcher = watch.watch(key);



// 开始listen,如果监听的Key有事件(如删除、更新等)发生则返回

WatchResponse response = null;

try

{

response = watcher.listen();

}

catch (InterruptedException e)

{

System.out.println("Failed to listen key:"+e);

}

if(response != null)

{

List<WatchEvent> eventlist = res.getEvents();

// 解析eventlist,判断是否为自己关注的事件,作进一步操作

// To do something

}


(4)Cluster 客户端

为了保障高可用性,实际应用中 Etcd 应工作于集群模式下,集群节点数量为大于 3 的奇数,为了灵活的管理集群,jetcd 提供了集群管理客户端,支持获取集群成员列表、增加成员、删除成员等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 创建客户端,本例中Etcd服务端为单机模式

Client client = Client.builder().endpoints("http://localhost:2379").build();



// 创建Cluster客户端

Cluster clusterClient = client.getClusterClient();



// 获取集群成员列表

List<Member> list = clusterClient.listMember().get().getMembers();



// 向集群中添加成员

String tempAddr = "http://localhost:2389";

List<String> peerAddrs = new ArrayList<String>();

peerAddrs.add(tempAddr);

clusterClient.addMember(peerAddrs);



// 根据成员ID删除成员

long memberID = 8170086329776576179L;

clusterClient.removeMember(memberID);



// 更新

clusterClient.updateMember(memberID, peerAddrs);


(5)Maintenance 客户端

Etcd 本质上是一个 Key-Value 存储系统,在一系列的 Put、Get、Delete 及 Compact 操作后,集群节点可能出现键空间不足等告警,通过 Maintenance 客户端,可以进行告警查询、告警解除、整理压缩碎片等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 创建客户端,本例中Etcd服务端为单机模式

Client client = Client.builder().endpoints("http://localhost:2379").build();



// 创建一个Maintenance 客户端

Maintenance maintClient = client.getMaintenanceClient();



// 获取指定节点的状态,对res做进一步解析可得节点状态详情

StatusResponse res = maintClient.statusMember("http://localhost:2379").get();



// 对指定的节点进行碎片整理,在压缩键空间之后,后端数据库可能呈现内部碎片,需进行整理

// 整理碎片是一个“昂贵”的操作,应避免同时对多个节点进行整理

maintClient.defragmentMember("http://localhost:2379").get();



// 获取所有活跃状态的键空间告警

List<AlarmMember> alarmList = maintClient.listAlarms().get().getAlarms();



// 解除指定键空间的告警

maintClient.alarmDisarm(alarmList.get(0));


3. Etcd 实现分布式锁:路线一

通过前面章节的铺垫,对于如何用 Etcd 实现分布式锁,相信读者已经心中有数,理解了原理,实现反而是简单的。在此,我给出一个 Demo 供读者参考。

3.1 基于 Etcd 的分布式锁业务流程

下面描述了使用 Etcd 实现分布式锁的业务流程,假设对某个共享资源设置的锁名为:/lock/mylock

步骤1:准备

客户端连接 Etcd,以 /lock/mylock 为前缀创建全局唯一的 Key,假设第一个客户端对应的 Key="/lock/mylock/UUID1",第二个为 Key="/lock/mylock/UUID2";客户端分别为自己的 Key 创建租约 Lease,租约的长度根据业务耗时确定,假设为 15s。

步骤2:创建定时任务作为租约的“心跳”

在一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,Key 将因租约到期而被删除,从而锁释放,避免死锁。

步骤3:客户端将自己全局唯一的 Key 写入 Etcd

进行 Put 操作,将步骤 1 中创建的 Key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 Put 操作返回的 Revision 分别为1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。

步骤4:客户端判断是否获得锁

客户端以前缀 /lock/mylock 读取 Key-Value 列表(Key-Value 中带有 Key 对应的 Revision),判断自己 Key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 Key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

步骤5:执行业务

获得锁后,操作共享资源,执行业务代码。

步骤6:释放锁

完成业务流程后,删除对应的 Key 释放锁。

3.2 基于 Etcd 的分布式锁的原理图

根据上一节中介绍的业务流程,基于Etcd的分布式锁示意图如下。

enter image description here

业务流程图大家可参看这篇文章《Zookeeper 分布式锁实现原理》

3.3 基于 Etcd 实现分布式锁的客户端 Demo

Demo 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
import java.util.List;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;



import com.coreos.jetcd.Client;

import com.coreos.jetcd.KV;

import com.coreos.jetcd.Lease;

import com.coreos.jetcd.Watch.Watcher;

import com.coreos.jetcd.options.GetOption;

import com.coreos.jetcd.options.GetOption.SortTarget;

import com.coreos.jetcd.options.PutOption;

import com.coreos.jetcd.watch.WatchEvent;

import com.coreos.jetcd.watch.WatchResponse;

import com.coreos.jetcd.data.ByteSequence;

import com.coreos.jetcd.data.KeyValue;

import com.coreos.jetcd.kv.PutResponse;



import java.util.UUID;



/**

* Etcd 客户端代码,用多个线程“抢锁”模拟分布式系统中,多个进程“抢锁”

*

*/

public class EtcdClient

{



public static void main(String[] args) throws InterruptedException, ExecutionException,

TimeoutException, ClassNotFoundException

{

// 创建Etcd客户端,Etcd服务端为单机模式

Client client = Client.builder().endpoints("http://localhost:2379").build();



// 对于某共享资源制定的锁名

String lockName = "/lock/mylock";



// 模拟分布式场景下,多个进程“抢锁”

for (int i = 0; i < 3; i++)

{

new MyThread(lockName, client).start();

}

}



/**

* 加锁方法,返回值为加锁操作中实际存储于Etcd中的Key,即:lockName+UUID,

* 根据返回的Key,可删除存储于Etcd中的键值对,达到释放锁的目的。

*

* @param lockName

* @param client

* @param leaseId

* @return

*/

public static String lock(String lockName, Client client, long leaseId)

{

// lockName作为实际存储在Etcd的中的Key的前缀,后缀是一个全局唯一的ID,从而确保:对于同一个锁,不同进程存储的Key具有相同的前缀,不同的后缀

StringBuffer strBufOfRealKey = new StringBuffer();

strBufOfRealKey.append(lockName);

strBufOfRealKey.append("/");

strBufOfRealKey.append(UUID.randomUUID().toString());



// 加锁操作实际上是一个put操作,每一次put操作都会使revision增加1,因此,对于任何一次操作,这都是唯一的。(get,delete也一样)

// 可以通过revision的大小确定进行抢锁操作的时序,先进行抢锁的,revision较小,后面依次增加。

// 用于记录自己“抢锁”的Revision,初始值为0L

long revisionOfMyself = 0L;



KV kvClient = client.getKVClient();

// lock,尝试加锁,加锁只关注Key,value不为空即可。

// 注意:这里没有考虑可靠性和重试机制,实际应用中应考虑put操作而重试

try

{

PutResponse putResponse = kvClient

.put(ByteSequence.fromString(strBufOfRealKey.toString()),

ByteSequence.fromString("value"),

PutOption.newBuilder().withLeaseId(leaseId).build())

.get(10, TimeUnit.SECONDS);



// 获取自己加锁操作的Revision号

revisionOfMyself = putResponse.getHeader().getRevision();

}

catch (InterruptedException | ExecutionException | TimeoutException e1)

{

System.out.println("[error]: lock operation failed:" + e1);

}



try

{

// lockName作为前缀,取出所有键值对,并且根据Revision进行升序排列,版本号小的在前

List<KeyValue> kvList = kvClient.get(ByteSequence.fromString(lockName),

GetOption.newBuilder().withPrefix(ByteSequence.fromString(lockName))

.withSortField(SortTarget.MOD).build())

.get().getKvs();



// 如果自己的版本号最小,则表明自己持有锁成功,否则进入监听流程,等待锁释放

if (revisionOfMyself == kvList.get(0).getModRevision())

{

System.out.println("[lock]: lock successfully. [revision]:" + revisionOfMyself);

// 加锁成功,返回实际存储于Etcd中的Key

return strBufOfRealKey.toString();

}

else

{

// 记录自己加锁操作的前一个加锁操作的索引,因为只有前一个加锁操作完成并释放,自己才能获得锁

int preIndex = 0;

for (int index = 0; index < kvList.size(); index++)

{

if (kvList.get(index).getModRevision() == revisionOfMyself)

{

preIndex = index - 1;// 前一个加锁操作,故比自己的索引小1

}

}

// 根据索引,获得前一个加锁操作对应的Key

ByteSequence preKeyBS = kvList.get(preIndex).getKey();



// 创建一个Watcher,用于监听前一个Key

Watcher watcher = client.getWatchClient().watch(preKeyBS);

WatchResponse res = null;

// 监听前一个Key,将处于阻塞状态,直到前一个Key发生delete事件

// 需要注意的是,一个Key对应的事件不只有delete,不过,对于分布式锁来说,除了加锁就是释放锁

// 因此,这里只要监听到事件,必然是delete事件或者Key因租约过期而失效删除,结果都是锁被释放

try

{

System.out.println("[lock]: keep waiting until the lock is released.");

res = watcher.listen();

}

catch (InterruptedException e)

{

System.out.println("[error]: failed to listen key.");

}



// 为了便于读者理解,此处写一点冗余代码,判断监听事件是否为DELETE,即释放锁

List<WatchEvent> eventlist = res.getEvents();

for (WatchEvent event : eventlist)

{

// 如果监听到DELETE事件,说明前一个加锁操作完成并已经释放,自己获得锁,返回

if (event.getEventType().toString().equals("DELETE"))

{

System.out.println("[lock]: lock successfully. [revision]:" + revisionOfMyself);

return strBufOfRealKey.toString();

}

}

}

}

catch (InterruptedException | ExecutionException e)

{

System.out.println("[error]: lock operation failed:" + e);

}



return strBufOfRealKey.toString();

}



/**

* 释放锁方法,本质上就是删除实际存储于Etcd中的Key

*

* @param lockName

* @param client

*/

public static void unLock(String realLockName, Client client)

{

try

{

client.getKVClient().delete(ByteSequence.fromString(realLockName)).get(10,

TimeUnit.SECONDS);

System.out.println("[unLock]: unlock successfully.[lockName]:" + realLockName);

}

catch (InterruptedException | ExecutionException | TimeoutException e)

{

System.out.println("[error]: unlock failed:" + e);

}

}



/**

* 自定义一个线程类,模拟分布式场景下多个进程 "抢锁"

*/

public static class MyThread extends Thread

{

private String lockName;

private Client client;



MyThread(String lockName, Client client)

{

this.client = client;

this.lockName = lockName;

}



@Override

public void run()

{

// 创建一个租约,有效期15s

Lease leaseClient = client.getLeaseClient();

Long leaseId = null;

try

{

leaseId = leaseClient.grant(15).get(10, TimeUnit.SECONDS).getID();

}

catch (InterruptedException | ExecutionException | TimeoutException e1)

{

System.out.println("[error]: create lease failed:" + e1);

return;

}



// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;

// 同时,一旦客户端发生故障,心跳便会中断,锁也会应租约过期而被动释放,避免死锁

ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

// 续约心跳为12s,仅作为举例

service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), 1, 12, TimeUnit.SECONDS);



// 1. try to lock

String realLoclName = lock(lockName, client, leaseId);



// 2. to do something

try

{

Thread.sleep(6000);

}

catch (InterruptedException e2)

{

System.out.println("[error]:" + e2);

}

// 3. unlock

service.shutdown();// 关闭续约的定时任务

unLock(realLoclName, client);

}

}



/**

* 在等待其它客户端释放锁期间,通过心跳续约,保证自己的Key-Value不会失效

*

*/

public static class KeepAliveTask implements Runnable

{

private Lease leaseClient;

private long leaseId;



KeepAliveTask(Lease leaseClient, long leaseId)

{

this.leaseClient = leaseClient;

this.leaseId = leaseId;

}



@Override

public void run()

{

leaseClient.keepAliveOnce(leaseId);

}

}

}


Demo 运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[lock]: lock successfully. [revision]:44

[lock]: keep waiting until the lock is released.

[lock]: keep waiting until the lock is released.

[unLock]: unlock successfully.

[lock]: lock successfully. [revision]:45

[unLock]: unlock successfully.

[lock]: lock successfully. [revision]:46

[unLock]: unlock successfully.


4. Etcd 实现分布式锁:路线二

Etcd 最新版本提供了支持分布式锁的基础接口,其本质就是将第 3 节(路线一)中介绍的实现细节进行了封装。单从使用的角度来看,这是非常有益的,大大降低了分布式锁的实现难度。但,与此同时,简化的接口也无形中为用户理解内部原理设置了屏障。

4.1 Etcd 提供的分布式锁基础接口

在介绍 jetcd 提供的 Lock 客户端之前,我们先用 Etcd 官方提供的 Go 语言客户端(etcdctl)验证一下分布式锁的实现原理。解压官方提供的 Etcd 安装包,里面有两个可执行文件:etcd 和 etcdctl,其中 etcd 是服务端,etcdctl 是客户端。在服务端启动的前提下,执行以下命令验证分布式锁原理。

(1)分别开启两个窗口,进入 etcdctl 所在目录,执行以下命令,显式指定 API 版本为 V3,老版本 V2 不支持分布式锁接口。

1
2
3
export ETCDCTL_API=3


(2)分别在两个窗口执行相同的加锁命令:

1
2
3
./etcdctl.exe lock mylock


(3)可以观察到,只有一个加锁成功,并返回了实际存储与 Etcd 中 Key 值:

1
2
3
4
5
$ ./etcdctl.exe lock mylock

mylock/694d65eb367c7ec4


(4)在加锁成功的窗口执行命令:Ctrl+c,释放锁;与此同时,另一个窗口加锁成功,如下所示:

1
2
3
4
5
$ ./etcdctl.exe lock mylock

mylock/694d65eb367c7ec8


很明显,同样的锁名“mylock”,两个客户端分别进行加锁操作,实际存储于 Etcd 中的 Key 并不是 “mylock”,而是以 “mylock” 为前缀,分别加了一个全局唯一的 ID。是不是和 “路线一” 中介绍得原理一致呢。

4.2 Etcd Java 客户端 jetcd 提供的 Lock 客户端

作为 Etcd 的 Java 客户端,jetcd 全面支持 Etcd 的 V3 接口,其中分布式锁相关的接口如下。看上去很简单,但事实上存在一个问题:租约没有心跳机制,在等待其它客户端释放锁期间,自己的租约存在过期的风险。鉴于此,需要进行改造。抛砖引玉,我在 4.3 节中提供了一个 Demo 供读者参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 创建客户端,本例中Etcd服务端为单机模式

Client client = Client.builder().endpoints("http://localhost:2379").build();



// 创建Lock客户端

Lock lockClient = client.getLockClient();



// 创建Lease客户端,并创建一个有效期为30s的租约

Lease leaseClient = client.getLeaseClient();

long leaseId = leaseClient.grant(30).get().getID();



// 加、解锁操作

try

{

// 调用lock接口,加锁,并绑定租约

lockClient.lock(ByteSequence.fromString("lockName"), leaseId).get();

// 调用unlock接口,解锁

lockClient.unlock(ByteSequence.fromString(lockName)).get();

}

catch (InterruptedException | ExecutionException e1)

{

System.out.println("[error]: lock failed:" + e1);

}


4.3 基于 Etcd 的 Lock 接口实现分布式锁 Demo

第一部分:分布式锁实现

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
import java.util.concurrent.ExecutionException;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;



import com.coreos.jetcd.Client;

import com.coreos.jetcd.Lease;

import com.coreos.jetcd.Lock;

import com.coreos.jetcd.data.ByteSequence;



public class DistributedLock

{

private static DistributedLock lockProvider = null;

private static Object mutex = new Object();

private Client client;

private Lock lockClient;

private Lease leaseClient;



private DistributedLock()

{

super();

// 创建Etcd客户端,本例中Etcd集群只有一个节点

this.client = Client.builder().endpoints("http://localhost:2379").build();

this.lockClient = client.getLockClient();

this.leaseClient = client.getLeaseClient();

}



public static DistributedLock getInstance()

{

synchronized (mutex)

{

if (null == lockProvider)

{

lockProvider = new DistributedLock();

}

}

return lockProvider;

}



/**

* 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。

*

* @param lockName: 针对某一共享资源(数据、文件等)制定的锁名

* @param TTL : Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁

* @return LockResult

*/

public LockResult lock(String lockName, long TTL)

{

LockResult lockResult = new LockResult();

/*1.准备阶段*/

// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;

// 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁

ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();



// 初始化返回值lockResult

lockResult.setIsLockSuccess(false);

lockResult.setService(service);



// 记录租约ID,初始值设为 0L

Long leaseId = 0L;



/*2.创建租约*/

// 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定。

try

{

leaseId = leaseClient.grant(TTL).get().getID();

lockResult.setLeaseId(leaseId);



// 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定。

long period = TTL - TTL / 5;

service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period,

TimeUnit.SECONDS);

}

catch (InterruptedException | ExecutionException e)

{

System.out.println("[error]: Create lease failed:" + e);

return lockResult;

}



System.out.println("[lock]: start to lock." + Thread.currentThread().getName());



/*3.加锁操作*/

// 执行加锁操作,并为锁对应的Key绑定租约

try

{

lockClient.lock(ByteSequence.fromString(lockName), leaseId).get();

}

catch (InterruptedException | ExecutionException e1)

{

System.out.println("[error]: lock failed:" + e1);

return lockResult;

}

System.out.println("[lock]: lock successfully." + Thread.currentThread().getName());



lockResult.setIsLockSuccess(true);



return lockResult;

}



/**

* 解锁操作,释放锁、关闭定时任务、解除租约

*

* @param lockName:锁名

* @param lockResult:加锁操作返回的结果

*/

public void unLock(String lockName, LockResult lockResult)

{

System.out.println("[unlock]: start to unlock." + Thread.currentThread().getName());

try

{

// 释放锁

lockClient.unlock(ByteSequence.fromString(lockName)).get();

// 关闭定时任务

lockResult.getService().shutdown();

// 删除租约

if (lockResult.getLeaseId() != 0L)

{

leaseClient.revoke(lockResult.getLeaseId());

}

}

catch (InterruptedException | ExecutionException e)

{

System.out.println("[error]: unlock failed: " + e);

}



System.out.println("[unlock]: unlock successfully." + Thread.currentThread().getName());

}



/**

* 在等待其它客户端释放锁期间,通过心跳续约,保证自己的锁对应租约不会失效

*

*/

public static class KeepAliveTask implements Runnable

{

private Lease leaseClient;

private long leaseId;



KeepAliveTask(Lease leaseClient, long leaseId)

{

this.leaseClient = leaseClient;

this.leaseId = leaseId;

}



@Override

public void run()

{

// 续约一次

leaseClient.keepAliveOnce(leaseId);

}

}



/**

* 该class用于描述加锁的结果,同时携带解锁操作所需参数

*

*/

public static class LockResult

{

private boolean isLockSuccess;

private long leaseId;

private ScheduledExecutorService service;



LockResult()

{

super();

}



public void setIsLockSuccess(boolean isLockSuccess)

{

this.isLockSuccess = isLockSuccess;

}



public void setLeaseId(long leaseId)

{

this.leaseId = leaseId;

}



public void setService(ScheduledExecutorService service)

{

this.service = service;

}



public boolean getIsLockSuccess()

{

return this.isLockSuccess;

}



public long getLeaseId()

{

return this.leaseId;

}



public ScheduledExecutorService getService()

{

return this.service;

}

}



}


第二部分:测试代码

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class DistributedLockTest

{



public static void main(String[] args)

{

// 模拟分布式场景下,多个进程 “抢锁”

for (int i = 0; i < 5; i++)

{

new MyThread().start();

}

}



public static class MyThread extends Thread

{

@Override

public void run()

{

String lockName = "/lock/mylock";

// 1. 加锁

LockResult lockResult = DistributedLock.getInstance().lock(lockName, 30);



// 2. 执行业务

if (lockResult.getIsLockSuccess())

{

// 获得锁后,执行业务,用sleep方法模拟.

try

{

Thread.sleep(10000);

}

catch (InterruptedException e)

{

System.out.println("[error]:" + e);

}

}



// 3. 解锁

DistributedLock.getInstance().unLock(lockName, lockResult);

}

}

}


第三部分:测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
[lock]: start to lock.Thread-4

[lock]: start to lock.Thread-3

[lock]: start to lock.Thread-1

[lock]: start to lock.Thread-0

[lock]: start to lock.Thread-2

[lock]: lock successfully.Thread-3

[unlock]: start to unlock.Thread-3

[unlock]: unlock successfully.Thread-3

[lock]: lock successfully.Thread-2

[unlock]: start to unlock.Thread-2

[unlock]: unlock successfully.Thread-2

[lock]: lock successfully.Thread-1

[unlock]: start to unlock.Thread-1

[unlock]: unlock successfully.Thread-1

[lock]: lock successfully.Thread-0

[unlock]: start to unlock.Thread-0

[unlock]: unlock successfully.Thread-0

[lock]: lock successfully.Thread-4

[unlock]: start to unlock.Thread-4

[unlock]: unlock successfully.Thread-4