我在上一篇文章聊了Redisson的可重入鎖,這次繼續來聊聊Redisson的公平鎖。下面是官方原話:
它保證了當多個Redisson使用者端執行緒同時請求加鎖時,優先分配給先發出請求的執行緒。所有請求執行緒會在一個佇列中排隊,當某個執行緒出現宕機時,Redisson會等待5秒後繼續下一個執行緒,也就是說如果前面有5個執行緒都處於等待狀態,那麼後面的執行緒會等待至少25秒。
原始碼版本:3.17.7
這是我 fork 的分支,新增了自己理解的中文註釋:https://github.com/xiaoguyu/redisson
先上官方例子:
RLock fairLock = redisson.getFairLock("anyLock");
// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
因為在Redisson中,公平鎖和普通可重入鎖的邏輯大體上一樣,我在上一篇文章都介紹了,這裡就不再贅述。下面開始介紹合理邏輯。
加鎖的 lua 指令碼在 RedissonFairLock#tryLockInnerAsync
方法中
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
long wait = threadWaitTime;
if (waitTime > 0) {
wait = unit.toMillis(waitTime);
}
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
......
}
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do " + // list為空,證明沒有人排隊,退出迴圈
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
// 能到這裡,證明有人排隊,拿出在排隊的第一個人的超時時間,如果超時了,則移除相應資料
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
// check if the lock can be acquired now
// 檢查是否可以獲取鎖。如果hash和list都不存在,或者執行緒佇列的第一個是當前執行緒,則可以獲取鎖
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
// 都獲取鎖了,當然要從執行緒佇列和時間佇列中移除
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
// 重新整理時間集合中的時間
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
// 和公平鎖的設定一樣,值加1並且設定過期時間
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// check if the lock is already held, and this is a re-entry
// 能到這裡,證明前面拿不到鎖,但是也要做可重入鎖的處理
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// the lock cannot be acquired
// check if the thread is already in the queue
// 時間集合中有值,證明執行緒已經在佇列中,不需要往後執行邏輯了
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
// 因為下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
// 所以這裡的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
// threadWaitTime
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
// 如果最後一個執行緒不是當前執行緒,則從時間集合取出(舉例:執行緒1/2/3按順序獲取鎖,此時pttl得到的是執行緒1的鎖過期時間,zscore拿到的是執行緒2的鎖的過期時間,此時執行緒3應該以執行緒2的為準)
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
// 否則直接獲取鎖的存活時間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
// 過期時間 = 鎖存活時間 + 等待時間 + 當前時間戳
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
// 如果新增到時間集合成功,則同時新增執行緒集合
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime);
}
throw new IllegalArgumentException();
}
公平鎖總共用了Redis的三種資料型別,對應著 lua 指令碼裡面的keys1、2、3的引數:
KEYS[1]
鎖的名字,使用 Hash 資料型別,是可重入鎖的基礎,結構為 {」threadId1」: 1, 「thread2」: 1},key為執行緒id,value是鎖的次數
KEYS[2]
執行緒佇列的名字,使用 List 資料型別,結構為 [ 「threadId1」, 「threadId2」 ],按順序存放需要獲取鎖的執行緒的id
KEYS[3]
時間佇列的名字,使用 sorted set 資料型別,結構為 {」threadId2」:123, 「threadId1」:190},key為執行緒id,value為獲取鎖的超時時間戳
我下面會用 鎖、執行緒佇列、時間佇列 來表示這3個資料結構,需要注意下我的表述。
同樣的,介紹下引數:
接下來,我們一段一段分析 lua 指令碼,首先看最開始的 while 迴圈
"while true do " + // list為空,證明沒有人排隊,退出迴圈
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
// 能到這裡,證明有人排隊,拿出在排隊的第一個人的超時時間,如果超時了,則移除相應資料
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// 從時間佇列和執行緒佇列中移除
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
具體的邏輯我在註釋中寫的很清楚了,看的時候記住 KEYS[2]、KEYS[3] 對應著執行緒佇列和時間佇列介面。主要注意的是,執行緒佇列只有當一個執行緒持有鎖,另一個執行緒獲取不到鎖時,才會有值(前面有人才排隊,沒人排什麼隊)。接著看第二段
// 檢查是否可以獲取鎖。當鎖不存在,並且執行緒佇列不存在或者執行緒佇列第一位是當前執行緒,則可以獲取鎖
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
// 都獲取鎖了,當然要從執行緒佇列和時間佇列中移除
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
// 重新整理時間佇列中的時間
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
// 和公平鎖的設定一樣,值加1並且設定過期時間
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
翻譯翻譯就是,鎖不存在(別人沒有持有鎖)並且執行緒佇列不存在或者執行緒佇列第一位是當前執行緒(不用排隊或者自己排第一)才能獲得鎖。因為時間佇列中存放的是各個執行緒等待鎖的超時時間戳,所以每次都需要重新整理下。繼續下一段邏輯
// 能到這裡,證明前面拿不到鎖,但是也要做可重入鎖的處理
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
這是可重入鎖的處理,繼續下一段
// 時間佇列中有值,證明執行緒已經在佇列中,不需要往後執行邏輯了
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
// 因為下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
// 所以這裡的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
舉例子:執行緒1持有鎖,執行緒2嘗試第一次獲取鎖(不進入這段if),執行緒2第二次獲取鎖(進入了這段if)。繼續下一段
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
// 如果最後一個執行緒不是當前執行緒,則從時間集合取出(舉例:執行緒1/2/3按順序獲取鎖,此時pttl得到的是執行緒1的鎖過期時間,zscore拿到的是執行緒2的鎖的過期時間,此時執行緒3應該以執行緒2的為準)
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
// 否則直接獲取鎖的存活時間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
// 過期時間 = 鎖存活時間 + 等待時間 + 當前時間戳
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
// 如果新增到時間集合成功,則同時新增執行緒集合
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
ttl 這段的獲取邏輯,翻譯翻譯就是,如果前面有人排隊,就以前面的超時時間為準,如果沒人排隊,就拿鎖的超時時間。獲取到 ttl ,就對新增到執行緒集合和時間集合。
以上就是公平鎖的加鎖 lua 指令碼的全部邏輯。講的有點亂,但是隻要能搞清楚keys1、2、3對應著哪種資料型別,理解整個邏輯應該問題不大。
解鎖的核心 lua 指令碼是下面這段RedissonFairLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads
"while true do " // 執行緒佇列為空,證明沒有人排隊,退出迴圈
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
// 能到這裡,證明有人排隊,拿出在排隊的第一個人的超時時間,如果超時了,則移除相應資料
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
// 如果鎖不存在,則通過訂閱釋出機制通知下一個等待中的執行緒
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
// 如果當前執行緒已經不存在鎖裡面,直接返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 可重入鎖處理邏輯,對當前執行緒的鎖次數減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
// 鎖次數仍然大於0,則重新整理鎖的存活時間
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
// 刪除鎖
"redis.call('del', KEYS[1]); " +
// 訂閱釋出機制通知下一個等待中的執行緒
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
算了,不想寫了,看註釋吧。
本文介紹了Redisson的公平鎖,邏輯大體上和普通可重入鎖一致,核心在於 lua 指令碼,運用了Redis的3種資料型別。