Redisson原始碼解讀-分散式鎖

2022-11-07 18:02:44

前言

Redisson是一個在Redis的基礎上實現的Java駐記憶體資料網格(In-Memory Data Grid)。Redisson有一樣功能是可重入的分散式鎖。本文來討論一下這個功能的特點以及原始碼分析。

前置知識

在講Redisson,咱們先來聊聊分散式鎖的特點以及Redis的釋出/訂閱機制,磨刀不誤砍柴工。

分散式鎖的思考

首先思考下,如果我們自己去實現一個分散式鎖,這個鎖需要具備哪些功能?

  1. 互斥(這是一個鎖最基本的功能)
  2. 鎖失效機制(也就是可以設定鎖定時長,防止死鎖)
  3. 高效能、高可用
  4. 阻塞、非阻塞
  5. 可重入、公平鎖
  6. 。。。

可見,實現一個分散式鎖,需要考慮的東西有很多。那麼,如果用Redis來實現分散式鎖呢?如果只需要具備上面說的1、2點功能,要怎麼寫?(ps:我就不寫了,自己想去)

Redis訂閱/釋出機制

Redisson中用到了Redis的訂閱/釋出機制,下面簡單介紹下。

簡單來說就是如果client2 、 client5 和 client1 訂閱了 channel1,當有訊息釋出到 channel1 的時候,client2 、 client5 和 client1 都會收到這個訊息。

圖片來自 菜鳥教學-Redis釋出訂閱

Redisson

原始碼版本:3.17.7

下面以Redisson官方的可重入同步鎖例子為入口,解讀下原始碼。

RLock lock = redisson.getLock("anyLock");
// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

加鎖

我用時序圖來表示加鎖和訂閱的過程。時序圖中括號後面的c1、c2代表client1,client2

當執行緒2獲取了鎖但還沒釋放鎖時,如果執行緒1去獲取鎖,會阻塞等待,直到執行緒2解鎖,通過Redis的釋出訂閱機制喚醒執行緒1,再次去獲取鎖。

加鎖方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),對應著就是RedissonLock#tryLock

/**
 * 獲取鎖
 * @param waitTime  嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
 * @param leaseTime 鎖的持有時間,超過這個時間鎖會自動失效(值應設定為大於業務處理的時間,確保在鎖有效期內業務能處理完)
 * @param unit 時間單位
 * @return 獲取鎖成功返回true,失敗返回false
 */
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();// 當前時間
    long threadId = Thread.currentThread().getId();// 當前執行緒id

    // 嘗試加鎖,加鎖成功返回null,失敗返回鎖的剩餘超時時間
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // 獲取鎖成功
    if (ttl == null) {
        return true;
    }

    // time小於0代表此時已經超過獲取鎖的等待時間,直接返回false
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        // 沒看懂這個方法,裡面裡面空執行,有知道的大神還請不吝賜教
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.whenComplete((res, ex) -> {
                // 出現異常,取消訂閱
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    } catch (ExecutionException e) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    try {
        // 判斷是否超時(超過了waitTime)
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
    
        while (true) {
            // 再次獲取鎖,成功則返回
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // 阻塞等待號誌喚醒或者超時,接收到訂閱時喚醒
            // 使用的是Semaphore#tryAcquire()
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 因為是同步操作,所以無論加鎖成功或失敗,都取消訂閱
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
}

先看一下整體邏輯:

  1. 嘗試加鎖,成功直接返回true
  2. 判斷超時
  3. 訂閱
  4. 判斷超時
  5. 迴圈 ( 嘗試獲取鎖 → 判斷超時 → 阻塞等待 )

tryLock方法看著很長,但是有很多程式碼都是重複的,本小節重點說一下嘗試加鎖的方法tryAcquire

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        // 呼叫lua指令碼,嘗試加鎖
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 這裡的if、else的區別就在於,如果沒有設定leaseTime,就使用預設的internalLockLeaseTime(預設30秒)
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        // 如果ttlRemaining為空,也就是tryLockInnerAsync方法中的lua執行結果返回空,證明獲取鎖成功
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 如果沒有設定鎖的持有時間(leaseTime),則啟動看門狗,定時給鎖續期,防止業務邏輯未執行完成鎖就過期了
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

tryAcquireAsync方法中,主要分為兩段邏輯:

  1. 呼叫lua指令碼加鎖:tryLockInnerAsync
  2. 看門狗:scheduleExpirationRenewal

看門狗在後面講,本小節重點還是在加鎖

// RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

Redisson使用了 Hash 結構來表示一個鎖,這樣 Hash 裡面的 key 為執行緒id,value 為鎖的次數。這樣巧妙地解決了可重入鎖的問題。

下面我們來分析下這段 lua 指令碼的邏輯(下面說的threadId都是指變數,不是說key就叫’threadId’):

  1. 如果鎖(hash結構)不存在,則建立,並新增一個鍵值對 (threadId : 1),並設定鎖的過期時間
  2. 如果鎖存在,則將鍵值對 threadId 對應的值 + 1,並設定鎖的過期時間
  3. 如果不如何1,2點,則返回鎖的剩餘過期時間

訂閱

讓我們把視線重新回到RedissonLock#tryLock中,當經過一些嘗試獲取鎖,超時判斷之後,程式碼來到while迴圈中。這個while迴圈是個死迴圈,只有成功獲取鎖或者超時,才會退出。一般死迴圈的設計中,都會有阻塞等待的程式碼,否則如果迴圈中的邏輯短時間拿不到結果,會造成資源搶佔和浪費。阻塞程式碼就是下面這段

if (ttl >= 0 && ttl < time) {
    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

commandExecutor.getNow(subscribeFuture).getLatch() 得到的是一個Semaphore號誌物件,這是jdk的內建物件,Semaphore#tryAcquire表示阻塞並等待喚醒。那麼號誌什麼時候被喚醒呢?在訂閱方法中RedissonLock#subscribe。訂閱方法的邏輯也不少,咱們直接講其最終呼叫的處理方法

// LockPubSub#onMessage
protected void onMessage(RedissonLockEntry value, Long message) {
    // 普通的解鎖走的是這個
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        // 這裡就是喚醒號誌的地方
        value.getLatch().release();
    // 這個是讀寫鎖用的
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        while (true) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }

        value.getLatch().release(value.getLatch().getQueueLength());
    }
}

value.getLatch().release() 也就是Semaphore#release ,會喚醒Semaphore#tryAcquire阻塞的執行緒

解鎖

上面我們聊了加鎖,本小節來聊下解鎖。呼叫路徑如下

// RedissonLock#unlock
// RedissonBaseLock#unlockAsync(long threadId)
public RFuture<Void> unlockAsync(long threadId) {
    // 呼叫lua解鎖
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        // 取消看門狗
        cancelExpirationRenewal(threadId);

        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}

解鎖的邏輯不復雜,呼叫lua指令碼解鎖以及取消看門狗。看門狗晚點說,先說下lua解鎖

// RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

老規矩,分析下這段lua:

  1. 如果鎖不存在,返回null
  2. 鎖的值減1,如果鎖的值大於0(也就是可重入鎖仍然有加鎖次數),則重新設定過期時間
  3. 如果鎖的值小於等於0,這說明可以真正解鎖了,刪除鎖並通過釋出訂閱機制釋出解鎖訊息

從 lua 中可以看到,解鎖時會發布訊息到 channel 中,加鎖方法RedissonLock#tryLock中有相對應的訂閱操作。

看門狗

試想一個場景:程式執行需要10秒,程式執行完成才去解鎖,而鎖的存活時間只有5秒,也就是程式執行到一半的時候鎖就可以被其他程式獲取了,這顯然不合適。那麼怎麼解決呢?

  1. 方式一:鎖永遠存在,直到解鎖。不設定存活時間。

    這種方法的弊端在於,如果程式沒解鎖就掛了,鎖就成了死鎖

  2. 方式二:依然設定鎖存活時間,但是監控程式的執行,如果程式還沒有執行完成,則定期給鎖續期。

方式二就是Redisson的看門狗機制。看門狗只有在沒有顯示指定鎖的持有時間(leaseTime)時才會生效。

// RedissonLock#tryAcquireAsync
// RedissonBaseLock#scheduleExpirationRenewal
protected void scheduleExpirationRenewal(long threadId) {
    // 建立ExpirationEntry,並放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法會從map中再拿出來用
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            // 看門狗的具體邏輯
            renewExpiration();
        } finally {
            // 如果執行緒被中斷了,就取消看門狗
            if (Thread.currentThread().isInterrupted()) {
                // 取消看門狗
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

scheduleExpirationRenewal 方法處理了ExpirationEntry和如果出現異常則取消看門狗,具體看門狗邏輯在 renewExpiration 方法中

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    // 建立延時任務,延時時間是internalLockLeaseTime / 3
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            // lua指令碼判斷,如果鎖存在,則續期並返回true,不存在則返回false
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // 鎖續期成功,則再啟動一個延時任務,繼續監測
                    renewExpiration();
                } else {
                    // 取消看門狗
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

Timeout 是一個延時任務,延時 internalLockLeaseTime / 3 時間執行。任務的內容主要是通過 renewExpirationAsync 方法對鎖進行續期,如果續期失敗(解鎖了、鎖到期等),則取消看門狗,如果續期成功,則遞迴 renewExpiration 方法,繼續建立延時任務。

internalLockLeaseTime 也就是 lockWatchdogTimeout 引數,預設是 30 秒。

總結

本文介紹了Redisson的加鎖、解鎖、看門狗機制,以及對Redis釋出訂閱機制的應用。因為篇幅有限,很多細節聊得不夠深入。此外Redisson的非同步機制、對Netty的使用等都是很值得水文章的。


參考資料

萬字長文帶你解讀Redisson分散式鎖的原始碼 - 知乎 (zhihu.com)

Redis分散式鎖-這一篇全瞭解(Redission實現分散式鎖完美方案)