相信使用過redis的,或者正在做分散式開發的童鞋都知道redisson元件,它的功能很多,但我們使用最頻繁的應該還是它的分散式鎖功能,少量的程式碼,卻實現了加鎖、鎖續命(看門狗)、鎖訂閱、解鎖、鎖等待(自旋)等功能,我們來看看都是如何實現的。
//獲取鎖物件
RLock redissonLock = redisson.getLock(lockKey);
//加分散式鎖
redissonLock.lock();
根據redissonLock.lock()
方法跟蹤到具體的private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)
方法,真正獲取加鎖的邏輯是在tryAcquireAsync
該方法中呼叫的tryLockInnerAsync()
方法,看看這個方法是怎麼實現的?
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 判斷是否存在分散式鎖,getName()也就是KEYS[1],也就是鎖key名
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 加鎖,執行hset 鎖key名 1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 設定過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 這個分支是redisson的重入鎖邏輯,鎖還在,鎖計數+1,重新設定過期時長
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 返回鎖的剩餘過期時長
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
發現底層是結合lua指令碼實現了加鎖邏輯。
為什麼底層結合了Lua指令碼?
Redis是在2.6推出了指令碼功能,允許開發者使用Lua語言編寫指令碼傳到redis執行。使用指令碼的好處如下:
1、減少網路開銷:本來5次網路請求的操作,可以用一個請求完成,原先5次請求的邏輯,可以一次性放到redis中執行,較少了網路往返時延。這點跟管道有點類似。
2、原子操作:Redis會將整個指令碼作為一個整體執行,中間不會被其他命令插入。管道不是原子的,不過
redis的批次操作命令(類似mset)是原子的。
也就意味著雖然指令碼中有多條redis指令,那即使有多條執行緒並行執行,在同一時刻也只有一個執行緒能夠執行這段邏輯,等這段邏輯執行完,分散式鎖也就獲取到了,其它執行緒再進來就獲取不到分散式鎖了。
大家都聽過鎖續命,肯定也知道這裡涉及到看門狗的概念。在呼叫tryLockInnerAsync()
方法時,第一個引數是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()
也就是預設的看門狗過期時間是private long lockWatchdogTimeout = 30 * 1000
毫秒。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 新增監聽器,判斷獲取鎖是否成功,成功的話,新增定時任務:定期更新鎖過期時間
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
// 根據tryLockInnerAsync方法,加鎖成功,return nil 也就是null
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 新增定時任務:定期更新鎖過期時間
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
當執行緒獲取到鎖後,會進入if (ttlRemaining == null)
分支,呼叫定期更新鎖過期時間scheduleExpirationRenewal
方法,我們看看該方法實現:
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 檢測KEYS[1]鎖是否還在,在的話再次設定過期時間
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
// 通過上面lua指令碼執行後會返回1,也就true,再次呼叫更新過期時間進行續期
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
// 延遲 internalLockLeaseTime / 3再執行續命
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
發現scheduleExpirationRenewal
方法只是用了Timeout作為任務,並沒有使用java的Timer()之類的定時器,而是在Timeout任務run()方法中定義了RFuture物件,通過給RFuture物件設定listener,在listener中通過Lua指令碼執行結果進行判斷是否還需要進行續期。通過這樣的方式來給分散式鎖進行續期。
這種方式實現定時更新確實很巧妙,定期時間很靈活。
鎖訂閱是針對那些沒有獲取到分散式鎖的執行緒而言的。來看看整個獲取鎖的方法:
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired,獲取到鎖,直接退出
if (ttl == null) {
return;
}
// 沒有獲取到鎖,進行訂閱
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
當第一個執行緒獲取到鎖後,會在if (ttl == null)
分支進行返回,第二個及以後的執行緒進來在沒獲取到鎖時,只能接著走下面的邏輯,進行鎖的訂閱。
接著進入到一個while迴圈,首先還是會進行一次嘗試獲取鎖(萬一此時第一個執行緒已經釋放鎖了呢),通過tryAcquire(leaseTime, unit, threadId)
方法,如果沒有獲取到鎖的話,會返回鎖的剩餘過期時間,如果剩餘過期時間大於0,則當前執行緒通過Semaphore
訊號號,將當前執行緒阻塞,底層執行LockSupport.parkNanos(this, nanosTimeout)
執行緒掛起剩餘過期時間後,會自動進行喚醒,再次執行tryAcquire
嘗試獲取鎖。所有沒有獲取到鎖的執行緒都會執行這個流程。
一定要等待剩餘過期時間後才喚醒嗎?
假設執行緒一獲取到鎖,過期時間預設為30s,當前執行業務邏輯已經過了5s,那其他執行緒走到這裡,則需要等待25s後才行進行喚醒,那萬一執行緒一執行業務邏輯只要10s,那其他執行緒還需要等待20s嗎?這樣豈不是導致效率很低?
答案是否定的,詳細看解鎖邏輯。
解鎖:redissonLock.unlock();
我們來看看具體的解鎖邏輯:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 鎖不存在,釋出unlockMessage解鎖訊息,通知其他等待執行緒
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 不存在該鎖,異常捕捉
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// redisson可重入鎖計數-1,依舊>0,則重新設定過期時間
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// redis刪除鎖,釋出unlockMessage解鎖訊息,通知其他等待執行緒
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
發現解鎖邏輯底層也是用了一個lua指令碼實現。具體的說明可以看程式碼註釋,刪除鎖後,並行布解鎖訊息,通知到其它執行緒,也就意味著不會其它等待的執行緒一直等待。
Semophore
號誌的訂閱中有個onMessage
方法,
protected void onMessage(RedissonLockEntry value, Long message) {
// 喚醒執行緒
value.getLatch().release(message.intValue());
while (true) {
Runnable runnableToExecute = null;
synchronized (value) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnableToExecute = runnable;
} else {
value.addListener(runnable);
}
}
}
if (runnableToExecute != null) {
runnableToExecute.run();
} else {
return;
}
}
}
解鎖後通過if (opStatus)
分支取消鎖續期邏輯。
總的來說,可以藉助一張圖加深理解:
分散式鎖的整體實現很巧妙,藉助lua指令碼的原子性,實現了很多功能,當然redisson還有其它很多功能,比如為了解決主從叢集中的非同步複製會導致鎖丟失問題,引入了redlock機制,還有分散式下的可重入鎖等。