大型分散式場景下比Redis分散式鎖更好的的實現方式etcd分散式鎖的實現流程原始碼分析

2020-10-25 10:00:27

這裡再切入本例將使用的場景模擬:商品秒殺,或者說高並行下,對於商品庫存扣減操作。我用一個SpringBoot小專案模擬一下該操作。
本例用到的技術棧:

SpringBoot
Redis
etcd
最新2020整理收集的一些面試題(都整理成檔案),有很多幹貨,包含netty,spring,執行緒,spring cloud等詳細講解,也有詳細的學習規劃圖,面試題整理等,我感覺在面試這塊講的非常清楚:獲取面試資料只需:點選這裡領取!!! 暗號:CSDN在這裡插入圖片描述

在正式肝程式碼之前,先來對etcd分散式鎖實現的機制和原理做一個瞭解。

etcd分散式鎖實現的基礎機制

Lease機制
租約機制(TTL,Time To Live),etcd 可以為儲存的 key-value 對設定租約,當租約到期,key-value 將失效刪除;
同時也支援續約,通過使用者端可以在租約到期之前續約,
以避免 key-value 對過期失效。
Lease 機制可以保證分散式鎖的安全性,為鎖對應的 key 設定租約,
即使鎖的持有者因故障而不能主動釋放鎖,鎖也會因租約到期而自動釋放。
Revision機制
每個 key 帶有一個 Revision 號,每進行一次事務便+1,它是全域性唯一的,
通過 Revision 的大小就可以知道進行寫操作的順序。
在實現分散式鎖時,多個使用者端同時搶鎖,
根據 Revision 號大小依次獲得鎖,可以避免 「羊群效應」 ,實現公平鎖。

羊群效應:羊群是一種很散亂的組織,平時在一起也是盲目地左衝右撞,但一旦有一隻頭羊動起來,其他的羊也會不假思索地一哄而上,全然不顧旁邊可能有的狼和不遠處更好的草。
etcd的Revision機制,可以根據Revision號的大小順序進行寫操作,因而可以避免「羊群效應」。
這和zookeeper的臨時順序節點+監聽機制可以避免羊群效應的原理是一致的。

Prefix機制
即字首機制。
例如,一個名為 /etcd/lock 的鎖,兩個爭搶它的使用者端進行寫操作,
實際寫入的 key 分別為:key1="/etcd/lock/UUID1",key2="/etcd/lock/UUID2"。
其中,UUID 表示全域性唯一的 ID,確保兩個 key 的唯一性。
寫操作都會成功,但返回的 Revision 不一樣,
那麼,如何判斷誰獲得了鎖呢?通過字首 /etcd/lock 查詢,返回包含兩個 key-value 對的的 KeyValue 列表,
同時也包含它們的 Revision,通過 Revision 大小,使用者端可以判斷自己是否獲得鎖。
Watch機制
即監聽機制。
Watch 機制支援 Watch 某個固定的 key,也支援 Watch 一個範圍(字首機制)。
當被 Watch 的 key 或範圍發生變化,使用者端將收到通知;在實現分散式鎖時,如果搶鎖失敗,
可通過 Prefix 機制返回的 Key-Value 列表獲得 Revision 比自己小且相差最小的 key(稱為 pre-key),
對 pre-key 進行監聽,因為只有它釋放鎖,自己才能獲得鎖,如果 Watch 到 pre-key 的 DELETE 事件,
則說明 pre-key 已經釋放,自己將持有鎖。

etcd分散式鎖原理圖

在這裡插入圖片描述

etcd分散式鎖的實現流程

建立連線

使用者端連線 etcd,以 /etcd/lock 為字首建立全域性唯一的 key,
假設第一個使用者端對應的 key="/etcd/lock/UUID1",第二個為 key="/etcd/lock/UUID2";
使用者端分別為自己的 key 建立租約 - Lease,租約的長度根據業務耗時確定;

建立定時任務作為租約的「心跳」

當一個使用者端持有鎖期間,其它使用者端只能等待,為了避免等待期間租約失效,
使用者端需建立一個定時任務作為「心跳」進行續約。此外,如果持有鎖期間使用者端崩潰,
心跳停止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖;

使用者端將自己全域性唯一的 key 寫入 etcd

執行 put 操作,將步驟 1 中建立的 key 繫結租約寫入 Etcd,根據 Etcd 的 Revision 機制,
假設兩個使用者端 put 操作返回的 Revision 分別為 1、2,使用者端需記錄 Revision 用以
接下來判斷自己是否獲得鎖;

使用者端判斷是否獲得鎖

使用者端以字首 /etcd/lock/ 讀取 key-Value 列表,判斷自己 key 的 Revision 是否為當前列表中
最小的,如果是則認為獲得鎖;否則監聽列表中前一個 Revision 比自己小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則自己獲得鎖;

執行業務

獲得鎖後,操作共用資源,執行業務程式碼

釋放鎖

完成業務流程後,刪除對應的key釋放鎖

程式碼

有了以上理論做基礎,我們開始etcd分散式鎖的程式碼實現。

jetcd使用者端

jetcd是etcd的Java使用者端,它提供了豐富的介面來操作etcd,使用方便。

在這裡插入圖片描述

redis資料準備

初始化庫存stock=300,再設定一個lucky=0,表示搶到庫存的人,實際場景中可以是使用者訂單資訊,每扣減一個庫存,lucky便加1。在這裡插入圖片描述

etcd分散式鎖的實現

由於etcd的Lock介面有一套自己的實現,zookeeper的Lock介面也有自己的一套實現,redis…各種分散式鎖實現方案都有自己的Lock,因此,我封裝了一個模板方法:

/**
 * @program: distributed-lock
 * @description: 各種分散式鎖的基礎類別,模板方法
 * @author: 行百里者
 * @create: 2020/10/14 12:29
 **/
public class AbstractLock implements Lock {
    @Override
    public void lock() {
        throw new RuntimeException("請自行實現該方法");
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new RuntimeException("請自行實現該方法");
    }

    @Override
    public boolean tryLock() {
        throw new RuntimeException("請自行實現該方法");
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new RuntimeException("請自行實現該方法");
    }

    @Override
    public void unlock() {
        throw new RuntimeException("請自行實現該方法");
    }

    @Override
    public Condition newCondition() {
        throw new RuntimeException("請自行實現該方法");
    }
}

有了這個模板方法之後,後續分散式鎖的實現均可以繼承這個模板方法類。

etcd分散式鎖的實現:

@Data
public class EtcdDistributedLock extends AbstractLock {
    private final static Logger LOGGER = LoggerFactory.getLogger(EtcdDistributedLock.class);

    private Client client;
    private Lock lockClient;
    private Lease leaseClient;
    private String lockKey;
    //鎖路徑,方便記錄紀錄檔
    private String lockPath;
    //鎖的次數
    private AtomicInteger lockCount;
    //租約有效期。作用 1:使用者端崩潰,租約到期後自動釋放鎖,防止死鎖 2:正常執行自動進行續租
    private Long leaseTTL;
    //續約鎖租期的定時任務,初次啟動延遲,預設為1s,根據實際業務需要設定
    private Long initialDelay = 0L;
    //定時任務執行緒池
    ScheduledExecutorService scheduledExecutorService;
    //執行緒與鎖物件的對映
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    public EtcdDistributedLock(Client client, String lockKey, Long leaseTTL, TimeUnit unit) {
        this.client = client;
        this.lockClient = client.getLockClient();
        this.leaseClient = client.getLeaseClient();
        this.lockKey = lockKey;
        this.leaseTTL = unit.toNanos(leaseTTL);
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    }

    @Override
    public void lock() {
        
    }

    @Override
    public void unlock() {
        
    }
}

其中lock方法的實現:

@Override
public void lock() {
    Thread currentThread = Thread.currentThread();
    LockData existsLockData = threadData.get(currentThread);
    //System.out.println(currentThread.getName() + " 加鎖 existsLockData:" + existsLockData);
    //鎖重入
    if (existsLockData != null && existsLockData.isLockSuccess()) {
        int lockCount = existsLockData.lockCount.incrementAndGet();
        if (lockCount < 0) {
            throw new Error("超出etcd鎖可重入次數限制");
        }
        return;
    }
    //建立租約,記錄租約id
    long leaseId;
    try {
        leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID();
        //續租心跳週期
        long period = leaseTTL - leaseTTL / 5;
        //啟動定時續約
        scheduledExecutorService.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId),
                initialDelay,
                period,
                TimeUnit.NANOSECONDS);

        //加鎖
        LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();
        if (lockResponse != null) {
            lockPath = lockResponse.getKey().toString(StandardCharsets.UTF_8);
            LOGGER.info("執行緒:{} 加鎖成功,鎖路徑:{}", currentThread.getName(), lockPath);
        }

        //加鎖成功,設定鎖物件
        LockData lockData = new LockData(lockKey, currentThread);
        lockData.setLeaseId(leaseId);
        lockData.setService(scheduledExecutorService);
        threadData.put(currentThread, lockData);
        lockData.setLockSuccess(true);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

簡而言之,加鎖的程式碼就是按照如下步驟來的:

檢查鎖重入性
設定租約
開啟定時任務心跳檢查
阻塞獲取鎖
加鎖成功,設定鎖物件

業務處理完成(扣減庫存)後,解鎖:

@Override
public void unlock() {
    Thread currentThread = Thread.currentThread();
    //System.out.println(currentThread.getName() + " 釋放鎖..");
    LockData lockData = threadData.get(currentThread);
    //System.out.println(currentThread.getName() + " lockData " + lockData);
    if (lockData == null) {
        throw new IllegalMonitorStateException("執行緒:" + currentThread.getName() + " 沒有獲得鎖,lockKey:" + lockKey);
    }
    int lockCount = lockData.lockCount.decrementAndGet();
    if (lockCount > 0) {
        return;
    }
    if (lockCount < 0) {
        throw new IllegalMonitorStateException("執行緒:" + currentThread.getName() + " 鎖次數為負數,lockKey:" + lockKey);
    }
    try {
        //正常釋放鎖
        if (lockPath != null) {
            lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get();
        }
        //關閉續約的定時任務
        lockData.getService().shutdown();
        //刪除租約
        if (lockData.getLeaseId() != 0L) {
            leaseClient.revoke(lockData.getLeaseId());
        }
    } catch (InterruptedException | ExecutionException e) {
        //e.printStackTrace();
        LOGGER.error("執行緒:" + currentThread.getName() + "解鎖失敗。", e);
    } finally {
        //移除當前執行緒資源
        threadData.remove(currentThread);
    }
    LOGGER.info("執行緒:{} 釋放鎖", currentThread.getName());
}

解鎖過程:

重入性檢查
移除當前鎖的節點路徑釋放鎖
清除重入的執行緒資源

介面測試

/**
 * @program: distributed-lock
 * @description: etcd分散式鎖演示-高並行下庫存扣減
 * @author: 行百里者
 * @create: 2020/10/15 13:24
 **/
@RestController
public class StockController {

    private final StringRedisTemplate redisTemplate;

    @Value("${server.port}")
    private String port;

    @Value("${etcd.lockPath}")
    private String lockKey;

    private final Client etcdClient;

    public StockController(StringRedisTemplate redisTemplate, @Value("${etcd.servers}") String servers) {
        //System.out.println("etcd servers:" + servers);
        this.redisTemplate = redisTemplate;
        this.etcdClient = Client.builder().endpoints(servers.split(",")).build();
    }

    @RequestMapping("/stock/reduce")
    public String reduceStock() {
        Lock lock = new EtcdDistributedLock(etcdClient, lockKey, 30L, TimeUnit.SECONDS);
        //獲得鎖
        lock.lock();
        //扣減庫存
        int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            //同時lucky+1
            redisTemplate.opsForValue().increment("lucky");
        } else {
            System.out.println("庫存不足");
        }
        //釋放鎖
        lock.unlock();
        return port + " reduce stock end!";
    }
}

這個就很簡單了,當一個請求打進來,先試圖上鎖,上鎖成功後,執行業務,扣減庫存,同時訂單資訊+1,業務處理完成後,釋放鎖。
壓力測試
測試介面已經完成,用JMeter模擬高並行場景,在同一時刻同時傳送500個請求(庫存只有300),觀察結果。
先啟動兩個服務,一個8080,一個8090:

在這裡插入圖片描述
設定nginx(主要為了方便模擬高並行和分散式):在這裡插入圖片描述
nginx的IP地址是192.168.2.10:在這裡插入圖片描述
因此,我們壓力測試,只需要向 http://192.168.2.10/stock/reduce 介面傳送請求即可。

在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述
執行壓測結果:在這裡插入圖片描述
在這裡插入圖片描述
從結果表明我們的etcd分散式鎖成功!

總結

最新2020整理收集的一些面試題(都整理成檔案),有很多幹貨,包含netty,spring,執行緒,spring cloud等詳細講解,也有詳細的學習規劃圖,面試題整理等,我感覺在面試這塊講的非常清楚:獲取面試資料只需:點選這裡領取!!! 暗號:CSDN
在這裡插入圖片描述