執行緒鎖通常在單個程序中使用,以防止多個執行緒同時存取共用資源。
在我們.NET中常見的執行緒鎖有:
更多的執行緒同步鎖,可以看這篇文章:https://www.cnblogs.com/Z7TS/p/16463494.html
分散式鎖是一種用於協調多個程序/節點之間的並行存取的機制,某個資源在同一時刻只能被一個應用所使用,可以通過一些共用的外部儲存系統來實現跨程序的同步和互斥
常見的分散式鎖實現:
DistributedLock開源專案中有多種實現方式,我們今天主要討論Redis中的分散式鎖實現。
Redis 本身可以被多個使用者端共用存取,正好就是一個共用儲存系統,可以用來儲存分散式鎖,而且 Redis 的讀寫效能高,可以應對高並行的鎖操作場景。
Redis 的 SET 命令有個 NX 引數可以實現「key不存在才插入」,所以可以用它來實現分散式鎖:
SET lock_keyunique_value NX PX 10000
釋放鎖的時候需要刪除key,或者使用lua指令碼來保證原子性。
// 釋放鎖時,先比較 unique_value 是否相等,避免鎖的誤釋放
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
基於上文中的實現方式,我們在設定key過期時間時,不能準確的描述業務處理時間。為了防止因為業務處理時間較長導致鎖過期而提前釋放鎖,通過不斷更新鎖的過期時間來保持鎖的有效性,避免了因鎖過期而導致的並行問題。
關於這個問題,目前常見的解決方法有兩種:
1、實現自動續租機制:額外起一個執行緒,定期檢查執行緒是否還持有鎖,如果有則延長過期時間。DistributedLock裡面就實現了這個方案,使用「看門狗」定期檢查(每1/3的鎖時間檢查1次),如果執行緒還持有鎖,則重新整理過期時間。
2、實現快速失敗機制:當我們解鎖時發現鎖已經被其他執行緒獲取了,說明此時我們執行的操作已經是「不安全」的了,此時需要進行回滾,並返回失敗。
以下是使用StackExchange.Redis 庫實現分散式鎖和續租機制的範例程式碼:
public class RedisLock
{
private readonly IDatabase _database;
private readonly string _lockKey;
private string _lockValue;
private readonly TimeSpan _lockTimeout;
private readonly TimeSpan _renewInterval;
private bool _isLocked;
public RedisLock(IDatabase database, string lockKey, TimeSpan lockTimeout, TimeSpan renewInterval)
{
_database = database;
_lockKey = lockKey;
_lockTimeout = lockTimeout;
_renewInterval = renewInterval;
}
//嘗試獲取鎖,如果成功,則啟動一個續租執行緒
public async Task<bool> AcquireAsync()
{
_lockValue = Guid.NewGuid().ToString();
var acquired = await _database.StringSetAsync(_lockKey, _lockValue, _lockTimeout, When.NotExists);
if (acquired)
{
_isLocked = true;
StartRenewal();
}
return acquired;
}
//定期使用 KeyExpireAsync 命令重置鍵的過期時間,從而實現續租機制
private async void StartRenewal()
{
while (_isLocked)
{
await Task.Delay(_renewInterval);
await _database.KeyExpireAsync(_lockKey, _lockTimeout);
}
}
}
Redlock 是一種分散式鎖實現方案,它的設計目標是解決 Redis 叢集模式下的分散式鎖並行控制問題。
它是基於多個 Redis 節點的分散式鎖,即使有節點發生了故障,鎖變數仍然是存在的,使用者端還是可以完成鎖操作
Redlock 演演算法加鎖三個過程:
加鎖成功後,使用者端需要重新計算這把鎖的有效時間,計算的結果是「鎖最初設定的過期時間」減去「使用者端從大多數節點獲取鎖的總耗時(t2-t1)」。如果計算的結果已經來不及完成共用資料的操作了,我們可以釋放鎖,以免出現還沒完成資料操作,鎖就過期了的情況。
加鎖失敗後,使用者端向所有 Redis 節點發起釋放鎖的操作,釋放鎖的操作和在單節點上釋放鎖的操作一樣,只要執行釋放鎖的 Lua 指令碼就可以了。
DistributedLock 是一個 .NET 庫,它基於各種底層技術提供強大且易於使用的分散式互斥體、讀寫器鎖和號誌。
DistributedLock 包含基於各種技術的實現;可以單獨安裝實現包,也可以只安裝 DistributedLock NuGet 包,這是一個「元」包,其中包含所有實現作為依賴項。請注意,每個包都根據 SemVer 獨立進行版本控制。
以下兩種方法,都是基於RedLock來實現的,在單機上,使用了續租機制,更多細節可以自己觀看原始碼,下文中會簡單介紹原始碼。
Acquire 方法返回一個代表持有鎖的「控制程式碼」物件。當控制程式碼被處理時,鎖被釋放:
var redisDistributedLock = new RedisDistributedLock(name, connectionString);
using (redisDistributedLock.Acquire())
{
//持有鎖
} //釋放鎖及相關資源
雖然 Acquire 將阻塞直到鎖可用,但還有一個 TryAcquire 變體,如果無法獲取鎖(由於在別處持有),則返回 null :
using (var handle = redisDistributedLock.TryAcquire())
{
if (handle != null)
{
// 我們獲得鎖
}
else
{
// 別人獲得鎖
}
}
支援非同步和依賴注入,依賴注入:
// Startup.cs:
services.AddSingleton<IDistributedLockProvider>(_ => new PostgresDistributedSynchronizationProvider(myConnectionString));
services.AddTransient<SomeService>();
// SomeService.cs
public class SomeService
{
private readonly IDistributedLockProvider _synchronizationProvider;
public SomeService(IDistributedLockProvider synchronizationProvider)
{
this._synchronizationProvider = synchronizationProvider;
}
public void InitializeUserAccount(int id)
{
// 通過provider構造lock
var @lock = this._synchronizationProvider.CreateLock($"UserAccount{id}");
using (@lock.Acquire())
{
//
}
using (this._synchronizationProvider.AcquireLock($"UserAccount{id}"))
{
//
}
}
}
https://github.com/madelson/DistributedLock
以下程式碼對原始碼,進行了刪減和修改,只想簡單的講述一下實現過程。
定義一個工廠介面,返回IDistributedLock,在依賴注入場景中,使用這個工廠介面可能會更加方便
public interface IDistributedLockProvider
{
IDistributedLock CreateLock(string name);
}
IDistributedLock:定義了控制並行存取的基本操作。該介面支援同步和非同步方式獲取鎖,並提供超時和取消功能,以適應各種情況
public interface IDistributedLock
{
// 唯一Name
string Name { get; }
// 獲取鎖的方法
IDistributedSynchronizationHandle Acquire(TimeSpan? timeout = null, CancellationToken cancellationToken = default);
//......
}
DistributedLock.Redis類庫,對Acquire的具體實現,該方法是嘗試獲取Redis分散式鎖範例。
private async ValueTask<RedisDistributedLockHandle?> TryAcquireAsync(CancellationToken cancellationToken)
{
// 初始化Redis連線和相關引數
//CreateLockId = $"{Environment.MachineName}_{currentProcess.Id}_" + Guid.NewGuid().ToString("n")
var primitive = new RedisMutexPrimitive(this.Key, RedLockHelper.CreateLockId(), this._options.RedLockTimeouts);
// 獲取和設定鎖
var tryAcquireTasks = await new RedLockAcquire(primitive, this._databases, cancellationToken).TryAcquireAsync().ConfigureAwait(false);
// 成功後,RedLockHandle這個裡邊實現了續租機制
return tryAcquireTasks != null
? new RedisDistributedLockHandle(new RedLockHandle(primitive, tryAcquireTasks, extensionCadence: this._options.ExtensionCadence, expiry: this._options.RedLockTimeouts.Expiry))
: null;
}
根據當前執行緒是否在同步上下文,對單庫和多庫實現進行區分和實現
// 該方法用於嘗試獲取分散式鎖,並返回一個表示各個資料庫節點獲取鎖狀態的任務字典
public async ValueTask<Dictionary<IDatabase, Task<bool>>?> TryAcquireAsync()
{
// 檢查當前執行緒是否在同步上下文中執行,以便根據不同情況採取不同的獲取鎖策略
if (SyncViaAsync.IsSynchronous&& this._databases.Count == 1)
return this.TrySingleFullySynchronousAcquire();
// 建立一個任務字典,將每個資料庫連線和其對應的獲取鎖任務關聯起來
var tryAcquireTasks = this._databases.ToDictionary(
db => db,
db => Helpers.SafeCreateTask(state => state.primitive.TryAcquireAsync(state.db), (primitive, db))
);
// 等待所有獲取鎖任務完成,並返回一個表示整體狀態的任務
var waitForAcquireTask = this.WaitForAcquireAsync(tryAcquireTasks).AwaitSyncOverAsync().ConfigureAwait(false);
// 執行清理操作
// 返回結果
return succeeded ? tryAcquireTasks : null;
}
單庫獲取Redis分散式鎖,就是通過set nx 設定值,返回bool,失敗就釋放資源,成功檢查是否超時。不超時就返回任務字典
private Dictionary<IDatabase, Task<bool>>? TrySingleFullySynchronousAcquire()
{
var database = this._databases.Single();
bool success;
var stopwatch = Stopwatch.StartNew();
// 通過StackExchange.Redis的StringSet進行無值設定key(set nx)
try { success = this._primitive.TryAcquire(database); }
catch
{
// 確保釋放鎖,以便防止出現死鎖等問題。然後重新丟擲異常
}
if (success)
{
// 檢查是否在超時時間內,並返回一個包含成功狀態的任務字典;否則繼續釋放鎖並返回null
}
return null;
}
多庫中是否獲取到分散式鎖
private async Task<bool> WaitForAcquireAsync(IReadOnlyDictionary<IDatabase, Task<bool>> tryAcquireTasks)
{
// 超時或取消時自動停止等待
using var timeout = new TimeoutTask(this._primitive.AcquireTimeout, this._cancellationToken);
var incompleteTasks = new HashSet<Task>(tryAcquireTasks.Values) { timeout.Task };
// 計數器
var successCount = 0;
var failCount = 0;
var faultCount = 0;
while (true)
{
// 不斷等待任務完成,如果任務為timeout,則表示超時;否則需要根據任務的狀態和訊號來判斷是否成功獲取鎖
var completed = await Task.WhenAny(incompleteTasks).ConfigureAwait(false);
if (completed == timeout.Task)
return false; // 超時
// 判斷是否超過成功或者失敗的閥值,是否超過1/2
if (completed.Status == TaskStatus.RanToCompletion)
{
var result = await ((Task<bool>)completed).ConfigureAwait(false);
if (result)
{
++successCount;
// 是否超過1/2的庫
if (RedLockHelper.HasSufficientSuccesses(successCount, this._databases.Count)) { return true; }
}
else
{
++failCount;
if (RedLockHelper.HasTooManyFailuresOrFaults(failCount, this._databases.Count)) { return false; }
}
}
else
{
++faultCount;
// ......
}
// ......
}
}
截止到目前,我們就知道如何獲取和設定分散式鎖了。接下來我們就看下是如何實現續租機制的。就是LeaseMonitor這個物件。
private static Task CreateMonitoringLoopTask(WeakReference<LeaseMonitor> weakMonitor, TimeoutValue monitoringCadence, CancellationToken disposalToken)
{
// 建立監視任務
return Task.Run(() => MonitoringLoop());
async Task MonitoringLoop()
{
var leaseLifetime = Stopwatch.StartNew();
do
{
await Task.Delay(monitoringCadence.InMilliseconds, disposalToken).TryAwait();
}
// 檢查RedLock租約的狀態和可用性
while (!disposalToken.IsCancellationRequested && await RunMonitoringLoopIterationAsync(weakMonitor, leaseLifetime).ConfigureAwait(false));
}
}
RunMonitoringLoopIterationAsync 裡邊最終呼叫了續時的lua指令碼
你們在公司中,都是如何實現分散式鎖的呢?可以在評論區留下您寶貴的建議。