.NET開源分散式鎖DistributedLock

2023-04-28 18:00:40

一、執行緒鎖和分散式鎖

執行緒鎖通常在單個程序中使用,以防止多個執行緒同時存取共用資源。

在我們.NET中常見的執行緒鎖有:

  • 自旋鎖:當執行緒嘗試獲取鎖時,它會重複執行一些簡單的指令,直到鎖可用
  • 互斥鎖: Mutex,可以跨程序使用。Mutex 類定義了一個互斥體物件,可以使用 WaitOne() 方法等待物件上的鎖
  • 混合鎖:Monitor,可以通過 lock 關鍵字來使用
  • 讀寫鎖:允許多個執行緒同時讀取共用資源,但只允許單個執行緒寫入共用資源
  • 號誌:Semaphore,它允許多個執行緒同時存取同一個資源

更多的執行緒同步鎖,可以看這篇文章:https://www.cnblogs.com/Z7TS/p/16463494.html

分散式鎖是一種用於協調多個程序/節點之間的並行存取的機制,某個資源在同一時刻只能被一個應用所使用,可以通過一些共用的外部儲存系統來實現跨程序的同步和互斥

常見的分散式鎖實現:

  • Redis 分散式鎖
  • ZooKeeper 分散式鎖
  • Mysql 分散式鎖
  • SqlServer 分散式鎖
  • 檔案分散式鎖

DistributedLock開源專案中有多種實現方式,我們今天主要討論Redis中的分散式鎖實現。

二、Redis分散式鎖的實現原理

基礎實現

Redis 本身可以被多個使用者端共用存取,正好就是一個共用儲存系統,可以用來儲存分散式鎖,而且 Redis 的讀寫效能高,可以應對高並行的鎖操作場景。

Redis 的 SET 命令有個 NX 引數可以實現「key不存在才插入」,所以可以用它來實現分散式鎖:

  • 如果 key 不存在,則顯示插入成功,可以用來表示加鎖成功;
  • 如果 key 存在,則會顯示插入失敗,可以用來表示加鎖失敗。
SET lock_keyunique_value NX PX 10000 
  • lock_key 就是 key 鍵;
  • unique_value 是使用者端生成的唯一的標識,區分來自不同使用者端的鎖操作;
  • NX 代表只在 lock_key 不存在時,才對 lock_key 進行設定操作;
  • PX 10000 表示設定 lock_key 的過期時間為 10s,這是為了避免使用者端發生異常而無法釋放鎖。

釋放鎖的時候需要刪除key,或者使用lua指令碼來保證原子性。

// 釋放鎖時,先比較 unique_value 是否相等,避免鎖的誤釋放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

續租機制

基於上文中的實現方式,我們在設定key過期時間時,不能準確的描述業務處理時間。為了防止因為業務處理時間較長導致鎖過期而提前釋放鎖,通過不斷更新鎖的過期時間來保持鎖的有效性,避免了因鎖過期而導致的並行問題。

關於這個問題,目前常見的解決方法有兩種:

1、實現自動續租機制:額外起一個執行緒,定期檢查執行緒是否還持有鎖,如果有則延長過期時間。DistributedLock裡面就實現了這個方案,使用「看門狗」定期檢查(每1/3的鎖時間檢查1次),如果執行緒還持有鎖,則重新整理過期時間。

2、實現快速失敗機制:當我們解鎖時發現鎖已經被其他執行緒獲取了,說明此時我們執行的操作已經是「不安全」的了,此時需要進行回滾,並返回失敗。

以下是使用StackExchange.Redis 庫實現分散式鎖和續租機制的範例程式碼:

public class RedisLock
{
    private readonly IDatabase _database;
    private readonly string _lockKey;
    private string _lockValue;
    private readonly TimeSpan _lockTimeout;

    private readonly TimeSpan _renewInterval;
    private bool _isLocked;

    public RedisLock(IDatabase database, string lockKey, TimeSpan lockTimeout, TimeSpan renewInterval)
    {
        _database = database;
        _lockKey = lockKey;
        _lockTimeout = lockTimeout;
        _renewInterval = renewInterval;
    }

    //嘗試獲取鎖,如果成功,則啟動一個續租執行緒
    public async Task<bool> AcquireAsync()
    {
        _lockValue = Guid.NewGuid().ToString();
        var acquired = await _database.StringSetAsync(_lockKey, _lockValue, _lockTimeout, When.NotExists);
        if (acquired)
        {
            _isLocked = true;
            StartRenewal();
        }
        return acquired;
    }

    //定期使用 KeyExpireAsync 命令重置鍵的過期時間,從而實現續租機制
    private async void StartRenewal()
    {
        while (_isLocked)
        {
            await Task.Delay(_renewInterval);
            await _database.KeyExpireAsync(_lockKey, _lockTimeout);
        }
    }
}

RedLock

Redlock 是一種分散式鎖實現方案,它的設計目標是解決 Redis 叢集模式下的分散式鎖並行控制問題。

它是基於多個 Redis 節點的分散式鎖,即使有節點發生了故障,鎖變數仍然是存在的,使用者端還是可以完成鎖操作

Redlock 演演算法加鎖三個過程:

  1. 使用者端獲取當前時間(t1)。
  2. 使用者端按順序依次向 N 個 Redis 節點(官方推薦是至少部署 5 個 Redis 節點)執行加鎖操作:
  • 加鎖操作使用 SET 命令,帶上 NX,EX/PX 選項,以及帶上使用者端的唯一標識。
  • 如果某個 Redis 節點發生故障了,為了保證在這種情況下,Redlock 演演算法能夠繼續執行,我們需要給「加鎖操作」設定一個超時時間(不是對「鎖」設定超時時間,而是對「加鎖操作」設定超時時間),加鎖操作的超時時間需要遠遠地小於鎖的過期時間,一般也就是設定為幾十毫秒。
  1. 一旦使用者端從超過半數(大於等於 N/2+1)的 Redis 節點上成功獲取到了鎖,就再次獲取當前時間(t2),然後計算計算整個加鎖過程的總耗時(t2-t1)。如果 t2-t1 < 鎖的過期時間,此時,認為使用者端加鎖成功,否則認為加鎖失敗。

加鎖成功後,使用者端需要重新計算這把鎖的有效時間,計算的結果是「鎖最初設定的過期時間」減去「使用者端從大多數節點獲取鎖的總耗時(t2-t1)」。如果計算的結果已經來不及完成共用資料的操作了,我們可以釋放鎖,以免出現還沒完成資料操作,鎖就過期了的情況。

加鎖失敗後,使用者端向所有 Redis 節點發起釋放鎖的操作,釋放鎖的操作和在單節點上釋放鎖的操作一樣,只要執行釋放鎖的 Lua 指令碼就可以了。

三、DistributedLock開源專案簡介

專案介紹

DistributedLock 是一個 .NET 庫,它基於各種底層技術提供強大且易於使用的分散式互斥體、讀寫器鎖和號誌。

DistributedLock 包含基於各種技術的實現;可以單獨安裝實現包,也可以只安裝 DistributedLock NuGet 包,這是一個「元」包,其中包含所有實現作為依賴項。請注意,每個包都根據 SemVer 獨立進行版本控制。

基礎使用

以下兩種方法,都是基於RedLock來實現的,在單機上,使用了續租機制,更多細節可以自己觀看原始碼,下文中會簡單介紹原始碼。

  • Acquire 方法

Acquire 方法返回一個代表持有鎖的「控制程式碼」物件。當控制程式碼被處理時,鎖被釋放:

  var redisDistributedLock = new RedisDistributedLock(name, connectionString); 
  using (redisDistributedLock.Acquire())
  {
      //持有鎖
  } //釋放鎖及相關資源
  • TryAcquire 方法

雖然 Acquire 將阻塞直到鎖可用,但還有一個 TryAcquire 變體,如果無法獲取鎖(由於在別處持有),則返回 null :

using (var handle = redisDistributedLock.TryAcquire())
{
    if (handle != null)
    {
        // 我們獲得鎖
    }
    else
    {
        // 別人獲得鎖
    }
}

支援非同步和依賴注入,依賴注入:

// Startup.cs:
services.AddSingleton<IDistributedLockProvider>(_ => new PostgresDistributedSynchronizationProvider(myConnectionString));
services.AddTransient<SomeService>();

// SomeService.cs
public class SomeService
{
    private readonly IDistributedLockProvider _synchronizationProvider;

    public SomeService(IDistributedLockProvider synchronizationProvider)
    {
        this._synchronizationProvider = synchronizationProvider;
    }

    public void InitializeUserAccount(int id)
    {
        // 通過provider構造lock
        var @lock = this._synchronizationProvider.CreateLock($"UserAccount{id}");
        using (@lock.Acquire())
        {
            // 
        }
      
        using (this._synchronizationProvider.AcquireLock($"UserAccount{id}"))
        {
            // 
        }
    }
}

四、淺析DistributedLock的Redis實現

原始碼地址

https://github.com/madelson/DistributedLock

目錄解析

  • DistributedLock.Core 是專案的抽象類庫,基礎分散式鎖、讀寫鎖、號誌的Provider和介面。
  • 其它幾個類庫是用不同儲存系統的具體實現

Redis的實現過程

以下程式碼對原始碼,進行了刪減和修改,只想簡單的講述一下實現過程。

定義一個工廠介面,返回IDistributedLock,在依賴注入場景中,使用這個工廠介面可能會更加方便

public interface IDistributedLockProvider
{
    IDistributedLock CreateLock(string name);
}

IDistributedLock:定義了控制並行存取的基本操作。該介面支援同步和非同步方式獲取鎖,並提供超時和取消功能,以適應各種情況

public interface IDistributedLock
{
    // 唯一Name
    string Name { get; }
    // 獲取鎖的方法
    IDistributedSynchronizationHandle Acquire(TimeSpan? timeout = null, CancellationToken cancellationToken = default);

    //......
}

DistributedLock.Redis類庫,對Acquire的具體實現,該方法是嘗試獲取Redis分散式鎖範例。

  private async ValueTask<RedisDistributedLockHandle?> TryAcquireAsync(CancellationToken cancellationToken)
  {
      // 初始化Redis連線和相關引數
      //CreateLockId = $"{Environment.MachineName}_{currentProcess.Id}_" + Guid.NewGuid().ToString("n")
      var primitive = new RedisMutexPrimitive(this.Key, RedLockHelper.CreateLockId(), this._options.RedLockTimeouts);

      // 獲取和設定鎖
      var tryAcquireTasks = await new RedLockAcquire(primitive, this._databases, cancellationToken).TryAcquireAsync().ConfigureAwait(false);

      // 成功後,RedLockHandle這個裡邊實現了續租機制
      return tryAcquireTasks != null 
          ? new RedisDistributedLockHandle(new RedLockHandle(primitive, tryAcquireTasks, extensionCadence: this._options.ExtensionCadence, expiry: this._options.RedLockTimeouts.Expiry)) 
          : null;
  }

根據當前執行緒是否在同步上下文,對單庫和多庫實現進行區分和實現

// 該方法用於嘗試獲取分散式鎖,並返回一個表示各個資料庫節點獲取鎖狀態的任務字典
public async ValueTask<Dictionary<IDatabase, Task<bool>>?> TryAcquireAsync()
{
    // 檢查當前執行緒是否在同步上下文中執行,以便根據不同情況採取不同的獲取鎖策略
    if (SyncViaAsync.IsSynchronous&& this._databases.Count == 1)
        return this.TrySingleFullySynchronousAcquire();

    // 建立一個任務字典,將每個資料庫連線和其對應的獲取鎖任務關聯起來
    var tryAcquireTasks = this._databases.ToDictionary(
        db => db,
        db => Helpers.SafeCreateTask(state => state.primitive.TryAcquireAsync(state.db), (primitive, db))
    );

    // 等待所有獲取鎖任務完成,並返回一個表示整體狀態的任務
    var waitForAcquireTask = this.WaitForAcquireAsync(tryAcquireTasks).AwaitSyncOverAsync().ConfigureAwait(false);

    // 執行清理操作 
 
    // 返回結果
    return succeeded ? tryAcquireTasks : null;
}

單庫獲取Redis分散式鎖,就是通過set nx 設定值,返回bool,失敗就釋放資源,成功檢查是否超時。不超時就返回任務字典

private Dictionary<IDatabase, Task<bool>>? TrySingleFullySynchronousAcquire()
{
    var database = this._databases.Single();

    bool success;
    var stopwatch = Stopwatch.StartNew();

    // 通過StackExchange.Redis的StringSet進行無值設定key(set nx)
    try { success = this._primitive.TryAcquire(database); }
    catch
    {
        // 確保釋放鎖,以便防止出現死鎖等問題。然後重新丟擲異常
    }

    if (success)
    {
        // 檢查是否在超時時間內,並返回一個包含成功狀態的任務字典;否則繼續釋放鎖並返回null
    }

    return null;
}

多庫中是否獲取到分散式鎖

private async Task<bool> WaitForAcquireAsync(IReadOnlyDictionary<IDatabase, Task<bool>> tryAcquireTasks)
{
    // 超時或取消時自動停止等待

    using var timeout = new TimeoutTask(this._primitive.AcquireTimeout, this._cancellationToken);
    var incompleteTasks = new HashSet<Task>(tryAcquireTasks.Values) { timeout.Task };

    // 計數器
    var successCount = 0;
    var failCount = 0;
    var faultCount = 0;

    while (true)
    {
        // 不斷等待任務完成,如果任務為timeout,則表示超時;否則需要根據任務的狀態和訊號來判斷是否成功獲取鎖
        var completed = await Task.WhenAny(incompleteTasks).ConfigureAwait(false);

        if (completed == timeout.Task)
            return false; // 超時

        // 判斷是否超過成功或者失敗的閥值,是否超過1/2
        if (completed.Status == TaskStatus.RanToCompletion)
        {
            var result = await ((Task<bool>)completed).ConfigureAwait(false);

            if (result)
            {
                ++successCount;
                // 是否超過1/2的庫
                if (RedLockHelper.HasSufficientSuccesses(successCount, this._databases.Count)) { return true; }
            }
            else
            {
                ++failCount;
                if (RedLockHelper.HasTooManyFailuresOrFaults(failCount, this._databases.Count)) { return false; }
            }
        }
        else 
        {      
            ++faultCount;
            // ......          
        }
        // ......
    }
}

截止到目前,我們就知道如何獲取和設定分散式鎖了。接下來我們就看下是如何實現續租機制的。就是LeaseMonitor這個物件。

private static Task CreateMonitoringLoopTask(WeakReference<LeaseMonitor> weakMonitor, TimeoutValue monitoringCadence, CancellationToken disposalToken)
{
    // 建立監視任務
    return Task.Run(() => MonitoringLoop());

    async Task MonitoringLoop()
    {
        var leaseLifetime = Stopwatch.StartNew();
        do
        {
            await Task.Delay(monitoringCadence.InMilliseconds, disposalToken).TryAwait();
        }
        // 檢查RedLock租約的狀態和可用性
        while (!disposalToken.IsCancellationRequested && await RunMonitoringLoopIterationAsync(weakMonitor, leaseLifetime).ConfigureAwait(false));
    }
}

RunMonitoringLoopIterationAsync 裡邊最終呼叫了續時的lua指令碼

你們在公司中,都是如何實現分散式鎖的呢?可以在評論區留下您寶貴的建議。