分散式鎖的概念以及常規解決方案可以參考之前的部落格:聊聊分散式鎖的解決方案;今天我們先分析下分散式鎖的實現思路;
提示:需要關注下圖裡判斷自身不是最小節點時的監聽情況,為什麼不監聽父節點?原因圖裡已有描述,這裡就不再贅述
通過不斷的偵錯,我封裝了一個ZkLockHelper
類,裡面封裝了上鎖和釋放鎖的方法,為了方便我將zk的一些監聽和回撥機智也融合到一起了,並沒有抽出來,下面貼上該類的全部程式碼
package com.darling.service.zookeeper.lock;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.platform.commons.util.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @author: dll
* @date: Created in 2022/11/4 8:41
* @version:
* @modified By:
*/
@Data
@Slf4j
public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {
private final String lockPath = "/lockItem";
ZooKeeper zkClient;
String threadName;
CountDownLatch cd = new CountDownLatch(1);
private String pathName;
/**
* 上鎖
*/
public void tryLock() {
try {
log.info("執行緒:{}正在建立節點",threadName);
zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");
log.info("執行緒:{}正在阻塞......",threadName);
// 由於上面是非同步建立所以這裡需要阻塞住當前執行緒
cd.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 釋放鎖
*/
public void unLock() {
try {
zkClient.delete(pathName,-1);
System.out.println(threadName + " 工作結束....");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* create方法的回撥,建立成功後在此處獲取/DCSLock的子目錄,比較節點ID是否最小,是則拿到鎖。。。
* @param rc 狀態碼
* @param path create方法的path入參
* @param ctx create方法的上下文入參
* @param name 建立成功的臨時有序節點的名稱,即在path的後面加上了zk維護的自增ID;
* 注意如果建立的不是有序節點,那麼此處的name和path的內容一致
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);
if (StringUtils.isNotBlank(name)) {
try {
pathName = name ;
// 此處path需注意要寫/
zkClient.getChildren("/", false,this,"123");
// List<String> children = zkClient.getChildren("/", false);
// log.info(">>>>>threadName:{},children:{}",threadName,children);
// // 給children排序
// Collections.sort(children);
// int i = children.indexOf(pathName.substring(1));
// // 判斷自身是否第一個
// if (Objects.equals(i,0)) {
// // 是第一個則表示搶到了鎖
// log.info("執行緒{}搶到了鎖",threadName);
// cd.countDown();
// }else {
// // 表示沒搶到鎖
// log.info("執行緒{}搶鎖失敗,重新註冊監聽器",threadName);
// zkClient.exists("/"+children.get(i-1),this,this,"AAA");
// }
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* exists方法的回撥,此處暫不做處理
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
/**
* exists的watch監聽
* @param event
*/
@Override
public void process(WatchedEvent event) {
//如果第一個執行緒鎖釋放了,等價於第一個執行緒刪除了節點,此時只有第二個執行緒會監控的到
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zkClient.getChildren("/", false,this,"123");
// // 此處path需注意要寫"/"
// List<String> children = null;
// try {
// children = zkClient.getChildren("/", false);
// } catch (KeeperException e) {
// e.printStackTrace();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// log.info(">>>>>threadName:{},children:{}",threadName,children);
// // 給children排序
// Collections.sort(children);
// int i = children.indexOf(pathName.substring(1));
// // 判斷自身是否第一個
// if (Objects.equals(i,0)) {
// // 是第一個則表示搶到了鎖
// log.info("執行緒{}搶到了鎖",threadName);
// cd.countDown();
// }else {
// /**
// * 表示沒搶到鎖;需要判斷前置節點存不存在,其實這裡並不是特別關心前置節點存不存在,所以其回撥可以不處理;
// * 但是這裡關注的前置節點的監聽,當前置節點監聽到被刪除時就是其他執行緒搶鎖之時
// */
// zkClient.exists("/"+children.get(i-1),this,this,"AAA");
// }
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
/**
* getChildren方法的回撥
* @param rc
* @param path
* @param ctx
* @param children
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
try {
log.info(">>>>>threadName:{},children:{}", threadName, children);
if (Objects.isNull(children)) {
return;
}
// 給children排序
Collections.sort(children);
int i = children.indexOf(pathName.substring(1));
// 判斷自身是否第一個
if (Objects.equals(i, 0)) {
// 是第一個則表示搶到了鎖
log.info("執行緒{}搶到了鎖", threadName);
cd.countDown();
} else {
// 表示沒搶到鎖
log.info("執行緒{}搶鎖失敗,重新註冊監聽器", threadName);
/**
* 表示沒搶到鎖;需要判斷前置節點存不存在,其實這裡並不是特別關心前置節點存不存在,所以其回撥可以不處理;
* 但是這裡關注的前置節點的監聽,當前置節點監聽到被刪除時就是其他執行緒搶鎖之時
*/
zkClient.exists("/" + children.get(i - 1), this, this, "AAA");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
提示:程式碼中註釋的程式碼塊可以關注下,原本是直接阻塞式程式設計,將獲取所有子節點並釋放鎖的操作直接寫在getChildren方法的回撥裡,後來發現當節點被刪除時我們還要重新搶鎖,那麼程式碼就冗餘了,於是結合響應式程式設計的思想,將這段核心程式碼放到
getChildren方法的回撥
裡,這樣程式碼簡潔了並且可以讓業務更只關注於getChildren
這件事了
package com.darling.service.zookeeper.lock;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
/**
* @description: 開啟是個執行緒給i做遞減操作,未加鎖的情況下會有執行緒安全問題
* @author: dll
* @date: Created in 2022/11/8 8:32
* @version:
* @modified By:
*/
@Slf4j
public class ZkLockTest02 {
private int i = 10;
@Test
public void test() throws InterruptedException {
for (int n = 0; n < 10; n++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(100);
incre();
}
}).start();
}
Thread.sleep(5000);
log.info("i = {}",i);
}
/**
* i遞減 執行緒不安全
*/
public void incre(){
// i.incrementAndGet();
log.info("當前執行緒:{},i = {}",Thread.currentThread().getName(),i--);
}
}
ZkLockHelper
實現的分散式鎖package com.darling.service.zookeeper.lock;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @description: 使用zk實現的分散式鎖解決執行緒安全問題
* @author: dll
* @date: Created in 2022/11/8 8:32
* @version:
* @modified By:
*/
@Slf4j
public class ZkLockTest03 {
ZooKeeper zkClient;
@Before
public void conn (){
zkClient = ZkUtil.getZkClient();
}
@After
public void close (){
try {
zkClient.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private int i = 10;
@Test
public void test() throws InterruptedException {
for (int n = 0; n < 10; n++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(100);
ZkLockHelper zkHelper = new ZkLockHelper();
// 這裡給zkHelper設定threadName是為了後續偵錯的時候紀錄檔列印,便於觀察存在的問題
String threadName = Thread.currentThread().getName();
zkHelper.setThreadName(threadName);
zkHelper.setZkClient(zkClient);
// tryLock上鎖
zkHelper.tryLock();
incre();
log.info("執行緒{}正在執行業務程式碼...",threadName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 釋放鎖
zkHelper.unLock();
}
}).start();
}
while (true) {
}
}
/**
* i遞減 執行緒不安全
*/
public void incre(){
// i.incrementAndGet();
log.info("☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆當前執行緒:{},i = {}",Thread.currentThread().getName(),i--);
}
}
由於紀錄檔中摻雜著zk的紀錄檔所有此處並未截全,但是也能看到i是在按規律遞減的,不會出現通過執行緒拿到相同值的情況
好了,zk實現分散式鎖的編碼實現就到這了,後續有時間再寫偏redis的,其實思路縷清了,編碼實現還是很簡單的