這裡再切入本例將使用的場景模擬:商品秒殺,或者說高並行下,對於商品庫存扣減操作。我用一個SpringBoot小專案模擬一下該操作。
本例用到的技術棧:
SpringBoot
Redis
etcd
最新2020整理收集的一些面試題(都整理成檔案),有很多幹貨,包含netty,spring,執行緒,spring cloud等詳細講解,也有詳細的學習規劃圖,面試題整理等,我感覺在面試這塊講的非常清楚:獲取面試資料只需:點選這裡領取!!! 暗號:CSDN
在正式肝程式碼之前,先來對etcd分散式鎖實現的機制和原理做一個瞭解。
Lease機制
租約機制(TTL,Time To Live),etcd 可以為儲存的 key-value 對設定租約,當租約到期,key-value 將失效刪除;
同時也支援續約,通過使用者端可以在租約到期之前續約,
以避免 key-value 對過期失效。
Lease 機制可以保證分散式鎖的安全性,為鎖對應的 key 設定租約,
即使鎖的持有者因故障而不能主動釋放鎖,鎖也會因租約到期而自動釋放。
Revision機制
每個 key 帶有一個 Revision 號,每進行一次事務便+1,它是全域性唯一的,
通過 Revision 的大小就可以知道進行寫操作的順序。
在實現分散式鎖時,多個使用者端同時搶鎖,
根據 Revision 號大小依次獲得鎖,可以避免 「羊群效應」 ,實現公平鎖。
羊群效應:羊群是一種很散亂的組織,平時在一起也是盲目地左衝右撞,但一旦有一隻頭羊動起來,其他的羊也會不假思索地一哄而上,全然不顧旁邊可能有的狼和不遠處更好的草。
etcd的Revision機制,可以根據Revision號的大小順序進行寫操作,因而可以避免「羊群效應」。
這和zookeeper的臨時順序節點+監聽機制可以避免羊群效應的原理是一致的。
Prefix機制
即字首機制。
例如,一個名為 /etcd/lock 的鎖,兩個爭搶它的使用者端進行寫操作,
實際寫入的 key 分別為:key1="/etcd/lock/UUID1",key2="/etcd/lock/UUID2"。
其中,UUID 表示全域性唯一的 ID,確保兩個 key 的唯一性。
寫操作都會成功,但返回的 Revision 不一樣,
那麼,如何判斷誰獲得了鎖呢?通過字首 /etcd/lock 查詢,返回包含兩個 key-value 對的的 KeyValue 列表,
同時也包含它們的 Revision,通過 Revision 大小,使用者端可以判斷自己是否獲得鎖。
Watch機制
即監聽機制。
Watch 機制支援 Watch 某個固定的 key,也支援 Watch 一個範圍(字首機制)。
當被 Watch 的 key 或範圍發生變化,使用者端將收到通知;在實現分散式鎖時,如果搶鎖失敗,
可通過 Prefix 機制返回的 Key-Value 列表獲得 Revision 比自己小且相差最小的 key(稱為 pre-key),
對 pre-key 進行監聽,因為只有它釋放鎖,自己才能獲得鎖,如果 Watch 到 pre-key 的 DELETE 事件,
則說明 pre-key 已經釋放,自己將持有鎖。
建立連線
使用者端連線 etcd,以 /etcd/lock 為字首建立全域性唯一的 key,
假設第一個使用者端對應的 key="/etcd/lock/UUID1",第二個為 key="/etcd/lock/UUID2";
使用者端分別為自己的 key 建立租約 - Lease,租約的長度根據業務耗時確定;
建立定時任務作為租約的「心跳」
當一個使用者端持有鎖期間,其它使用者端只能等待,為了避免等待期間租約失效,
使用者端需建立一個定時任務作為「心跳」進行續約。此外,如果持有鎖期間使用者端崩潰,
心跳停止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖;
使用者端將自己全域性唯一的 key 寫入 etcd
執行 put 操作,將步驟 1 中建立的 key 繫結租約寫入 Etcd,根據 Etcd 的 Revision 機制,
假設兩個使用者端 put 操作返回的 Revision 分別為 1、2,使用者端需記錄 Revision 用以
接下來判斷自己是否獲得鎖;
使用者端判斷是否獲得鎖
使用者端以字首 /etcd/lock/ 讀取 key-Value 列表,判斷自己 key 的 Revision 是否為當前列表中
最小的,如果是則認為獲得鎖;否則監聽列表中前一個 Revision 比自己小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則自己獲得鎖;
執行業務
獲得鎖後,操作共用資源,執行業務程式碼
釋放鎖
完成業務流程後,刪除對應的key釋放鎖
有了以上理論做基礎,我們開始etcd分散式鎖的程式碼實現。
jetcd是etcd的Java使用者端,它提供了豐富的介面來操作etcd,使用方便。
初始化庫存stock=300,再設定一個lucky=0,表示搶到庫存的人,實際場景中可以是使用者訂單資訊,每扣減一個庫存,lucky便加1。
由於etcd的Lock介面有一套自己的實現,zookeeper的Lock介面也有自己的一套實現,redis…各種分散式鎖實現方案都有自己的Lock,因此,我封裝了一個模板方法:
/**
* @program: distributed-lock
* @description: 各種分散式鎖的基礎類別,模板方法
* @author: 行百里者
* @create: 2020/10/14 12:29
**/
public class AbstractLock implements Lock {
@Override
public void lock() {
throw new RuntimeException("請自行實現該方法");
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new RuntimeException("請自行實現該方法");
}
@Override
public boolean tryLock() {
throw new RuntimeException("請自行實現該方法");
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new RuntimeException("請自行實現該方法");
}
@Override
public void unlock() {
throw new RuntimeException("請自行實現該方法");
}
@Override
public Condition newCondition() {
throw new RuntimeException("請自行實現該方法");
}
}
有了這個模板方法之後,後續分散式鎖的實現均可以繼承這個模板方法類。
@Data
public class EtcdDistributedLock extends AbstractLock {
private final static Logger LOGGER = LoggerFactory.getLogger(EtcdDistributedLock.class);
private Client client;
private Lock lockClient;
private Lease leaseClient;
private String lockKey;
//鎖路徑,方便記錄紀錄檔
private String lockPath;
//鎖的次數
private AtomicInteger lockCount;
//租約有效期。作用 1:使用者端崩潰,租約到期後自動釋放鎖,防止死鎖 2:正常執行自動進行續租
private Long leaseTTL;
//續約鎖租期的定時任務,初次啟動延遲,預設為1s,根據實際業務需要設定
private Long initialDelay = 0L;
//定時任務執行緒池
ScheduledExecutorService scheduledExecutorService;
//執行緒與鎖物件的對映
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
public EtcdDistributedLock(Client client, String lockKey, Long leaseTTL, TimeUnit unit) {
this.client = client;
this.lockClient = client.getLockClient();
this.leaseClient = client.getLeaseClient();
this.lockKey = lockKey;
this.leaseTTL = unit.toNanos(leaseTTL);
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void lock() {
}
@Override
public void unlock() {
}
}
其中lock方法的實現:
@Override
public void lock() {
Thread currentThread = Thread.currentThread();
LockData existsLockData = threadData.get(currentThread);
//System.out.println(currentThread.getName() + " 加鎖 existsLockData:" + existsLockData);
//鎖重入
if (existsLockData != null && existsLockData.isLockSuccess()) {
int lockCount = existsLockData.lockCount.incrementAndGet();
if (lockCount < 0) {
throw new Error("超出etcd鎖可重入次數限制");
}
return;
}
//建立租約,記錄租約id
long leaseId;
try {
leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID();
//續租心跳週期
long period = leaseTTL - leaseTTL / 5;
//啟動定時續約
scheduledExecutorService.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId),
initialDelay,
period,
TimeUnit.NANOSECONDS);
//加鎖
LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();
if (lockResponse != null) {
lockPath = lockResponse.getKey().toString(StandardCharsets.UTF_8);
LOGGER.info("執行緒:{} 加鎖成功,鎖路徑:{}", currentThread.getName(), lockPath);
}
//加鎖成功,設定鎖物件
LockData lockData = new LockData(lockKey, currentThread);
lockData.setLeaseId(leaseId);
lockData.setService(scheduledExecutorService);
threadData.put(currentThread, lockData);
lockData.setLockSuccess(true);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
簡而言之,加鎖的程式碼就是按照如下步驟來的:
檢查鎖重入性
設定租約
開啟定時任務心跳檢查
阻塞獲取鎖
加鎖成功,設定鎖物件
業務處理完成(扣減庫存)後,解鎖:
@Override
public void unlock() {
Thread currentThread = Thread.currentThread();
//System.out.println(currentThread.getName() + " 釋放鎖..");
LockData lockData = threadData.get(currentThread);
//System.out.println(currentThread.getName() + " lockData " + lockData);
if (lockData == null) {
throw new IllegalMonitorStateException("執行緒:" + currentThread.getName() + " 沒有獲得鎖,lockKey:" + lockKey);
}
int lockCount = lockData.lockCount.decrementAndGet();
if (lockCount > 0) {
return;
}
if (lockCount < 0) {
throw new IllegalMonitorStateException("執行緒:" + currentThread.getName() + " 鎖次數為負數,lockKey:" + lockKey);
}
try {
//正常釋放鎖
if (lockPath != null) {
lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get();
}
//關閉續約的定時任務
lockData.getService().shutdown();
//刪除租約
if (lockData.getLeaseId() != 0L) {
leaseClient.revoke(lockData.getLeaseId());
}
} catch (InterruptedException | ExecutionException e) {
//e.printStackTrace();
LOGGER.error("執行緒:" + currentThread.getName() + "解鎖失敗。", e);
} finally {
//移除當前執行緒資源
threadData.remove(currentThread);
}
LOGGER.info("執行緒:{} 釋放鎖", currentThread.getName());
}
解鎖過程:
重入性檢查
移除當前鎖的節點路徑釋放鎖
清除重入的執行緒資源
/**
* @program: distributed-lock
* @description: etcd分散式鎖演示-高並行下庫存扣減
* @author: 行百里者
* @create: 2020/10/15 13:24
**/
@RestController
public class StockController {
private final StringRedisTemplate redisTemplate;
@Value("${server.port}")
private String port;
@Value("${etcd.lockPath}")
private String lockKey;
private final Client etcdClient;
public StockController(StringRedisTemplate redisTemplate, @Value("${etcd.servers}") String servers) {
//System.out.println("etcd servers:" + servers);
this.redisTemplate = redisTemplate;
this.etcdClient = Client.builder().endpoints(servers.split(",")).build();
}
@RequestMapping("/stock/reduce")
public String reduceStock() {
Lock lock = new EtcdDistributedLock(etcdClient, lockKey, 30L, TimeUnit.SECONDS);
//獲得鎖
lock.lock();
//扣減庫存
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
//同時lucky+1
redisTemplate.opsForValue().increment("lucky");
} else {
System.out.println("庫存不足");
}
//釋放鎖
lock.unlock();
return port + " reduce stock end!";
}
}
這個就很簡單了,當一個請求打進來,先試圖上鎖,上鎖成功後,執行業務,扣減庫存,同時訂單資訊+1,業務處理完成後,釋放鎖。
壓力測試
測試介面已經完成,用JMeter模擬高並行場景,在同一時刻同時傳送500個請求(庫存只有300),觀察結果。
先啟動兩個服務,一個8080,一個8090:
設定nginx(主要為了方便模擬高並行和分散式):
nginx的IP地址是192.168.2.10:
因此,我們壓力測試,只需要向 http://192.168.2.10/stock/reduce 介面傳送請求即可。
執行壓測結果:
從結果表明我們的etcd分散式鎖成功!
最新2020整理收集的一些面試題(都整理成檔案),有很多幹貨,包含netty,spring,執行緒,spring cloud等詳細講解,也有詳細的學習規劃圖,面試題整理等,我感覺在面試這塊講的非常清楚:獲取面試資料只需:點選這裡領取!!! 暗號:CSDN