Redis
是一款強大的高效能鍵值儲存資料庫,也是目前NOSQL
中最流行比較流行的一款資料庫,它在廣泛的應用場景中扮演著至關重要的角色,包括但不限於快取、訊息佇列、對談儲存等。在本文中,我們將介紹如何基於C# Socket
來實現一個簡單的Redis使用者端類RedisClient
,來演示構建請求和輸出的相關通訊機制。需要注意的是本文只是著重展示如何基於原生的Socket
方式與Redis Server
進行通訊,並不是構建一個強大的Redis開發工具包
。
Redis(Remote Dictionary Server)
是一個記憶體資料庫,它支援了非常豐富的資料結構,包括字串、列表、集合、雜湊、有序集合等。Redis 提供了高效能的讀寫操作,可以用於快取資料、訊息佇列、分散式鎖、對談管理等多種用途。Redis 通常以鍵值對的方式儲存資料,每個鍵都與一個值相關聯,值的型別可以是字串、列表、雜湊等。Redis
不僅提供了豐富的命令集,用於操作儲存在資料庫中的資料,還提供了Redis serialization protocol (RESP)
協定來解析Redis Server
返回的資料。相關的檔案地址如下所示:
Redis命令
是與Redis伺服器進行通訊的主要方式,通俗點就是傳送指定格式的指令用於執行各種操作,包括資料儲存、檢索、修改和刪除等。以下是一些日常使用過程中常見的Redis命令及其用途:
GET 和 SET 命令
GET key
: 用於獲取指定鍵的值。SET key value
: 用於設定指定鍵的值.DEL 命令
DEL key
: 用於刪除指定鍵.EXPIRE 和 TTL 命令
EXPIRE key seconds
: 用於為指定鍵設定過期時間(秒).TTL key
: 用於獲取指定鍵的剩餘過期時間(秒).注意這裡的時間單位是秒
INCR 和 DECR 命令
INCR key
: 用於遞增指定鍵的值.DECR key
: 用於遞減指定鍵的值.RPUSH 和 LPOP 命令
RPUSH key value
: 用於將值新增到列表的右側.LPOP key
: 用於從列表的左側彈出一個值.HSET 和 HGET 命令
HSET key field value
: 用於設定雜湊表中指定欄位的值.HGET key field
: 用於獲取雜湊表中指定欄位的值.PUBLISH 和 SUBSCRIBE 命令
PUBLISH channel message
: 用於向指定頻道釋出訊息.SUBSCRIBE channel
: 用於訂閱指定頻道的訊息.當然 Redis 支援的命令遠不止這些,它還包括對集合、有序集合、點陣圖、HyperLogLog 等資料結構的操作,以及事務、Lua 指令碼執行等高階功能。我們接下來演示的時候也只是展示幾個大家比較熟悉的指令,這也是我們學習新知識的時候經常使用的方式,先從最簡單最容易的開始入手,循序漸進,這也是微精通
所提倡的方式。
Redis Serialization Protocol (RESP)
是 Redis 使用的二進位制協定,用於使用者端和伺服器之間的通訊。我們可以通過該協定解析Redis伺服器
返回的命令格式,解析我們想要的資料。RESP具有簡潔易解析的特點
簡單字串協定:
+OK\r\n
+OK\r\n
批次字串協定:
$5\r\nhello\r\n
$5\r\nhello\r\n
整數協定:
:42\r\n
:42\r\n
陣列協定:
*3\r\n:1\r\n:2\r\n:3\r\n
*3\r\n:1\r\n:2\r\n:3\r\n
錯誤協定:
-Error message\r\n
-Error message\r\n
需要注意的是字串協定裡面的長度不是具體字元的長度,而是對應的
UTF8
對應的位元組陣列的長度,這一點對於我們解析返回的資料很重要,否則獲取資料的時候會影響資料的完整性。
RESP協定
是Redis高效效能的關鍵之一,它相對比較加單,不需要解析各種頭資訊等,這使得Redis能夠在處理大規模資料和請求時表現出色。瞭解RESP協定可以幫助您更好地理解Redis使用者端類 RedisClient
的內部工作原理。可以理解為它屬於一種應用層面的協定,通過給定的資料格式解析出想要的資料,這也對我們在實際程式設計過程中,解決類似的問題,提供了一個不錯的思路。
上面我們介紹了一些關於Redis
的基礎概念,重點介紹了一下關於Redis
的命令和RESP
,接下來我們就結合上面的理論,基於C# Socket
來簡單的模擬一下如何和Redis Server
進行資料互動。主要就是結合Redis命令
和Redis 協定(RESP)
來簡單的實現。
首先來看一下類的結構
public class RedisClient : IDisposable, IAsyncDisposable
{
//定義預設埠
private readonly int DefaultPort = 6379;
//定義預設地址
private readonly string Host = "localhost";
//心跳間隔,單位為毫秒
private readonly int HeartbeatInterval = 30000;
private bool _isConnected;
//心跳定時器
private Timer _heartbeatTimer;
private Socket _socket;
public RedisClient(string host = "localhost", int defaultPort = 6379)
{
Host = host;
DefaultPort = defaultPort;
// 初始化心跳定時器
_heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
}
//連線方法
public async Task ConnectAsync(int timeoutMilliseconds = 5000)
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var cts = new CancellationTokenSource(timeoutMilliseconds);
await _socket.ConnectAsync(Host, DefaultPort, cts.Token);
_isConnected = true;
}
//心跳方法
private async void HeartbeatCallback(object state)
{
if (_isConnected)
{
var pingCommand = "PING\r\n";
await SendCommandAsync(pingCommand);
}
}
//釋放邏輯
public void Dispose()
{
// 停止心跳定時器
_heartbeatTimer.Dispose();
if (_socket != null)
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
}
public ValueTask DisposeAsync()
{
Dispose();
return ValueTask.CompletedTask;
}
}
上面的類定義了實現的大致通訊結構,結構中主要涉及到的是通訊相關的功能實現,包含Socket
的初始化資訊、預設的連連線資訊、心跳方法、釋放邏輯等。首先,在建構函式中,指定了預設的Redis埠(6379)、地址(localhost),並初始化了心跳定時器。連線方法ConnectAsync
通過Socket
建立與Redis伺服器
的TCP連線。心跳定時器HeartbeatCallback
定期傳送PING
命令,確保與伺服器的連線保持活動。最後,Dispose方法
用於釋放資源,包括停止心跳定時器和關閉Socket
連線,實現了IDisposable
和IAsyncDisposable
介面。這些功能為RedisClient
類提供了基本的連線和資源管理能力。由於我對Socket
程式設計也不是很熟悉,所以定義的可能不是很完善,有比較熟悉的同學,可以多多指導。
有了這個基礎的架子之後,我們可以在裡面填寫具體的實現邏輯了。首先我們來定義傳送Redis
命令和解析RESP
的邏輯
//傳送命令
public async Task<string> SendCommandAsync(string command)
{
// 傳送命令的實現
if (!_isConnected)
{
// 如果連線已斷開,可以進行重連
await ConnectAsync();
}
//Redis的命令是以\r\n為結尾的
var request = Encoding.UTF8.GetBytes(command + "\r\n");
//傳送命令
await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None);
var response = new StringBuilder();
var remainingData = string.Empty;
//初始化響應字串和剩餘資料
byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024);
try
{
while (true)
{
//讀取返回資訊
var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
//將接收到的資料新增到響應字串
var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
//提取完整的響應並新增到響應字串中
var completeResponses = ExtractCompleteResponses(ref responseData);
foreach (var completeResponse in completeResponses)
{
response.Append(completeResponse);
}
remainingData = responseData;
//結果為\r\n讀取結束
if (response.ToString().EndsWith("\r\n"))
{
break;
}
}
}
finally
{
//釋放緩衝區
ArrayPool<byte>.Shared.Return(receiveBuffer);
}
//返回完整的響應字串
return response.ToString();
}
private List<string> ExtractCompleteResponses(ref string data)
{
var completeResponses = new List<string>();
while (true)
{
var index = data.IndexOf("\r\n");
if (index >= 0)
{
// 提取一個完整的響應
var completeResponse = data.Substring(0, index + 2);
//將完整的響應新增到列表中
completeResponses.Add(completeResponse);
data = data.Substring(index + 2);
}
else
{
break;
}
}
return completeResponses;
}
private string ParseResponse(string response)
{
if (response.StartsWith("$"))
{
// 處理 Bulk Strings($)
var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
if (int.TryParse(lengthStr, out int length))
{
if (length == -1)
{
return null!;
}
string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
return value;
}
}
else if (response.StartsWith("+"))
{
// 處理 Simple Strings(+)
return response.Substring(1, response.Length - 3);
}
else if (response.StartsWith(":"))
{
// 處理 Integers(:)
var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
if (int.TryParse(valueStr, out int value))
{
return value.ToString();
}
}
// 如果響應格式不符合預期,丟擲異常
throw new InvalidOperationException(response);
}
上面邏輯涉及到傳送和接收Redis訊息的三個方法SendCommandAsync
、ExtractCompleteResponses
、ParseResponse
。雖然上面程式碼中有註釋,但是咱們分別I簡單的講解一下這三個方法
SendCommandAsync
該方法主要目的是向 Redis 伺服器傳送命令並非同步接收響應
ExtractCompleteResponses
該方法主要用於從接收到的資料中提取出一個或多個完整的響應。
ParseResponse
該方法主要用於解析從 Redis 伺服器接收到的響應字串。
上面有了和Redis通訊
的基本方法,也有了解析RESP
協定的基礎方法,接下來咱們實現幾個簡單的Redis操作指令
來展示一下Redis使用者端具體是如何工作的,簡單的幾個方法如下所示
//切換db操作
public async Task SelectAsync(int dbIndex)
{
var command = $"SELECT {dbIndex}";
await SendCommandAsync(command);
}
//get操作
public async Task<string> GetAsync(string key)
{
var command = $"GET {key}";
return ParseResponse(await SendCommandAsync(command));
}
//set操作
public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null)
{
var command = $"SET {key} '{value}'";
//判斷會否追加過期時間
if (expiry.HasValue)
{
command += $" EX {expiry.Value.TotalSeconds}";
}
var response = ParseResponse(await SendCommandAsync(command));
return response == "OK";
}
//支援過期時間的setnx操作
public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null)
{
//因為預設的setnx方法不支援新增過期時間,為了保證操作的原子性,使用了lua
var command = $"EVAL \"if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end\" 1 {key} '{value}'";
if (expiry.HasValue)
{
command += $" {expiry.Value.TotalSeconds}";
}
var response = ParseResponse(await SendCommandAsync(command));
return response == "1";
}
//新增支援函過期時間的list push操作
public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null)
{
var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1])
if tonumber(ARGV[2]) > 0 then
redis.call('EXPIRE', KEYS[1], ARGV[2])
end
return len";
var keys = new string[] { key };
var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };
var response = await ExecuteLuaScriptAsync(script, keys, args);
return long.Parse(response);
}
//list pop操作
public async Task<string> ListPopAsync(string key)
{
var command = $"LPOP {key}";
return ParseResponse(await SendCommandAsync(command));
}
//listrange操作
public async Task<List<string>> ListRangeAsync(string key, int start, int end)
{
var command = $"LRANGE {key} {start} {end}";
var response = await SendCommandAsync(command);
if (response.StartsWith("*0\r\n"))
{
return new List<string>();
}
//由於list range返回了是一個陣列,所以單獨處理了一下,這裡我使用了正則,解析字串也可以,方法隨意
var values = new List<string>();
var pattern = @"\$\d+\r\n(.*?)\r\n";
MatchCollection matches = Regex.Matches(response, pattern);
foreach (Match match in matches)
{
values.Add(match.Groups[1].Value);
}
return values;
}
//執行lua指令碼的方法
public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
{
//去除lua裡的換行
script = Regex.Replace(script, @"[\r\n]", "");
// 構建EVAL命令,將Lua指令碼、keys和args傳送到Redis伺服器
var command = $"EVAL \"{script}\" { keys?.Length??0 } ";
//拼接key和value引數
if (keys != null && keys.Length != 0)
{
command += string.Join(" ", keys.Select(key => $"{key}"));
}
if (args != null && args.Length != 0)
{
command += " " + string.Join(" ", args.Select(arg => $"{arg}"));
}
return ParseResponse(await SendCommandAsync(command));
}
//redis釋出操作
public async Task SubscribeAsync(string channel, Action<string, string> handler)
{
await SendCommandAsync($"SUBSCRIBE {channel}");
while (true)
{
var response = await SendCommandAsync(string.Empty);
string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
Match match = Regex.Match(response, pattern);
if (match.Success)
{
string ch = match.Groups[2].Value;
string message = match.Groups[3].Value;
handler(ch, message);
}
}
}
//redis訂閱操作
public async Task PublishAsync(string channel, string message)
{
await SendCommandAsync($"PUBLISH {channel} {message}");
}
上面方法中演示了幾個比較常見的操作,很簡單,主要是向大家展示Redis
命令是如何傳送的,從最簡單的GET
、SET
、LIST
、釋出訂閱
、執行LUA
操作方面著手,如果對Redis命令
比較熟悉的話,操作起來還是比較簡單的,這裡給大家講解幾個比較有代表的方法
setnx
方法,由於自帶的setnx方法不支援新增過期時間,為了保證操作的原子性,使用了lua指令碼的方式lpush
也就是上面ListPushAsync
方法中封裝的操作,自帶的也是沒辦法給定過期時間的,為了保證操作的原子性,我在這裡也是用lua進行封裝lua指令碼
的時候的時候需要注意lua指令碼的格式EVAL script numkeys [key [key ...]] [arg [arg ...]]
指令碼後面緊跟著的長度是key的個數
這個需要注意\r\n
的處理和引號
的跳脫問題,當然研究的越深,遇到的問題越多相信大家也看到了,這裡我封裝的都是幾個簡單的操作,難度係數不大,因為主要是向大家演示Redis使用者端
的傳送和接收操作是什麼樣的,甚至我都是直接返回的字串,真實使用的時候我們使用都是需要封裝序列化和反序列化操作的。
上面分別對RedisClient
類中的方法進行了講解,接下來我把我封裝的類完整的給大家貼出來,由於封裝的只是幾個簡單的方法用於演示,所以也只有一個類,程式碼量也不多,主要是為了方便大家理解,有想試驗的同學可以直接拿走
public class RedisClient : IDisposable, IAsyncDisposable
{
private readonly int DefaultPort = 6379;
private readonly string Host = "localhost";
private readonly int HeartbeatInterval = 30000;
private bool _isConnected;
private Timer _heartbeatTimer;
private Socket _socket;
public RedisClient(string host = "localhost", int defaultPort = 6379)
{
Host = host;
DefaultPort = defaultPort;
_heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
}
public async Task ConnectAsync(int timeoutMilliseconds = 5000)
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var cts = new CancellationTokenSource(timeoutMilliseconds);
await _socket.ConnectAsync(Host, DefaultPort, cts.Token);
_isConnected = true;
}
public async Task SelectAsync(int dbIndex)
{
var command = $"SELECT {dbIndex}";
await SendCommandAsync(command);
}
public async Task<string> GetAsync(string key)
{
var command = $"GET {key}";
return ParseResponse(await SendCommandAsync(command));
}
public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null)
{
var command = $"SET {key} '{value}'";
if (expiry.HasValue)
{
command += $" EX {expiry.Value.TotalSeconds}";
}
var response = ParseResponse(await SendCommandAsync(command));
return response == "OK";
}
public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null)
{
var command = $"EVAL \"if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end\" 1 {key} '{value}'";
if (expiry.HasValue)
{
command += $" {expiry.Value.TotalSeconds}";
}
var response = ParseResponse(await SendCommandAsync(command));
return response == "1";
}
public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null)
{
var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1])
if tonumber(ARGV[2]) > 0 then
redis.call('EXPIRE', KEYS[1], ARGV[2])
end
return len";
var keys = new string[] { key };
var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };
var response = await ExecuteLuaScriptAsync(script, keys, args);
return long.Parse(response);
}
public async Task<string> ListPopAsync(string key)
{
var command = $"LPOP {key}";
return ParseResponse(await SendCommandAsync(command));
}
public async Task<long> ListLengthAsync(string key)
{
var command = $"LLEN {key}";
return long.Parse(ParseResponse(await SendCommandAsync(command)));
}
public async Task<List<string>> ListRangeAsync(string key, int start, int end)
{
var command = $"LRANGE {key} {start} {end}";
var response = await SendCommandAsync(command);
if (response.StartsWith("*0\r\n"))
{
return new List<string>();
}
var values = new List<string>();
var pattern = @"\$\d+\r\n(.*?)\r\n";
MatchCollection matches = Regex.Matches(response, pattern);
foreach (Match match in matches)
{
values.Add(match.Groups[1].Value);
}
return values;
}
public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
{
script = Regex.Replace(script, @"[\r\n]", "");
var command = $"EVAL \"{script}\" { keys?.Length??0 } ";
if (keys != null && keys.Length != 0)
{
command += string.Join(" ", keys.Select(key => $"{key}"));
}
if (args != null && args.Length != 0)
{
command += " " + string.Join(" ", args.Select(arg => $"{arg}"));
}
return ParseResponse(await SendCommandAsync(command));
}
public async Task SubscribeAsync(string channel, Action<string, string> handler)
{
await SendCommandAsync($"SUBSCRIBE {channel}");
while (true)
{
var response = await SendCommandAsync(string.Empty);
string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
Match match = Regex.Match(response, pattern);
if (match.Success)
{
string ch = match.Groups[2].Value;
string message = match.Groups[3].Value;
handler(ch, message);
}
}
}
public async Task PublishAsync(string channel, string message)
{
await SendCommandAsync($"PUBLISH {channel} {message}");
}
public async Task<string> SendCommandAsync(string command)
{
if (!_isConnected)
{
await ConnectAsync();
}
var request = Encoding.UTF8.GetBytes(command + "\r\n");
await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None);
var response = new StringBuilder();
var remainingData = string.Empty;
byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024);
try
{
while (true)
{
var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
var completeResponses = ExtractCompleteResponses(ref responseData);
foreach (var completeResponse in completeResponses)
{
response.Append(completeResponse);
}
remainingData = responseData;
if (response.ToString().EndsWith("\r\n"))
{
break;
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(receiveBuffer);
}
return response.ToString();
}
private List<string> ExtractCompleteResponses(ref string data)
{
var completeResponses = new List<string>();
while (true)
{
var index = data.IndexOf("\r\n");
if (index >= 0)
{
var completeResponse = data.Substring(0, index + 2);
completeResponses.Add(completeResponse);
data = data.Substring(index + 2);
}
else
{
break;
}
}
return completeResponses;
}
private string ParseResponse(string response)
{
if (response.StartsWith("$"))
{
var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
if (int.TryParse(lengthStr, out int length))
{
if (length == -1)
{
return null!;
}
string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
return value;
}
}
else if (response.StartsWith("+"))
{
return response.Substring(1, response.Length - 3);
}
else if (response.StartsWith(":"))
{
var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
if (int.TryParse(valueStr, out int value))
{
return value.ToString();
}
}
throw new InvalidOperationException(response);
}
private async void HeartbeatCallback(object state)
{
if (_isConnected)
{
var pingCommand = "PING\r\n";
await SendCommandAsync(pingCommand);
}
}
public void Dispose()
{
_heartbeatTimer.Dispose();
if (_socket != null)
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
}
public ValueTask DisposeAsync()
{
Dispose();
return ValueTask.CompletedTask;
}
}
上面我們封裝了RedisClient
類,也講解了裡面實現的幾個簡單的方法,接下來我們就簡單的使用一下它,比較簡單直接上程式碼
GET/SET
是最基礎和最簡單的指令,沒啥可說的直接上程式碼
using RedisClient redisClient = new RedisClient();
await redisClient.ConnectAsync();
//切換db
await redisClient.SelectAsync(3);
bool setResult = await redisClient.SetAsync("key:foo", "are you ok,你好嗎?", TimeSpan.FromSeconds(120));
string getResult = await redisClient.GetAsync("key:foo");
Console.WriteLine("get key:foo:" + getResult);
SETNX
比較常用,很多時候用在做分散式鎖的場景,判斷資源存不存在的時候經常使用
//第一次setnx返回true
bool setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
Console.WriteLine("first setnx order:lock:" + setNxResult);
//第一次setnx返回false
setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
Console.WriteLine("second setnx aname:foo:" + setNxResult);
這裡實現的SubscribeAsync
和PublishAsync
需要使用兩個RedisClient
範例,因為我上面封裝的每個RedisClient
只包含一個Socket
範例所以ReceiveAsync
方法是阻塞的。如果同一個範例的話SubscribeAsync
的時候,在使用PublishAsync
方法的時候會被阻塞,所以演示的時候使用了兩個RedisClient
範例
_ = redisClient.SubscribeAsync("order_msg_ch", (ch, msg) => { Console.WriteLine($"接收訊息:[{ch}]---[{msg}]"); });
Thread.Sleep(2000);
using RedisClient redisClient2 = new RedisClient();
await redisClient2.ConnectAsync();
for (int i = 0; i < 5; i++)
{
await redisClient2.PublishAsync("order_msg_ch", $"傳送訊息{i}");
Thread.Sleep(2000);
}
動態執行lua的功能還是比較強大的,在之前的專案中,我也使用類似的功能。我們是模擬搶單/完成
的場景,比如業務人員需要自行搶單,每個人最多搶幾單,超過閾值則搶單失敗,你需要把搶到的完成了才能繼續搶單,這種操作就需要藉助lua進行操作
//搶單的lua
string takeOrderLuaScript = @"
local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0')
if ordersTaken < tonumber(ARGV[1]) then
redis.call('INCR', KEYS[1])
return 1
else
return 0
end";
//完成你手裡的訂單操作
string completeOrderLuaScript = @"
local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0')
if ordersTaken > 0 then
redis.call('DECR', KEYS[1])
return 1
else
return 0
end";
//模擬搶單,最多搶兩單
string result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
//完成訂單
string anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
還有一個功能也是我們之前遇到的,就是使用Redis
實現快取最新的N條訊息,舊的則被拋棄,實現這個功能也需要使用Redis的List
結構結合lua的方式
string luaScript = @"
local record_key = KEYS[1]
local max_records = tonumber(ARGV[1])
local new_record = ARGV[2]
local current_count = redis.call('LLEN', record_key)
if current_count >= max_records then
redis.call('LPOP', record_key)
end
redis.call('RPUSH', record_key, new_record)
";
//這裡限制儲存最新的50條資料,舊的資料則被拋棄
for (int i = 0; i < 60; i++)
{
_ = await redisClient.ExecuteLuaScriptAsync(luaScript, keys: new[] { "msg:list" }, new[] { "50", i.ToString() });
}
LIST
很多時候會把它當做分散式佇列來使用,它提供的操作也比較靈活,咱們這裡只是封裝了幾個最簡單的操作,大致的效果如下所示
//lis入隊操作
var res = await redisClient.ListPushAsync("list:2", "123", TimeSpan.FromHours(1));
res = await redisClient.ListPushAsync("list:2", "1234", TimeSpan.FromHours(1));
res = await redisClient.ListPushAsync("list:2", "12345", TimeSpan.FromHours(1));
//list出隊操作
var str = await redisClient.ListPopAsync("list:2");
//list長度
var length = await redisClient.ListLengthAsync("list:2");
//list range操作
var list = await redisClient.ListRangeAsync("article:list", 0, 10);
本文我們通過理解Redis命令
和RESP協定
來構建了一個簡單RedisClient
的實現,方便我們更容易的理解Redis使用者端
如何與Redis伺服器
進行通訊,這個實現也可以作為學習和理解·Redis使用者端·的一個很好的例子。當然我們的這個RedisClient
這是瞭解和學習使用,很多場景我們並沒有展示,實際的專案我們還是儘量使用開源的Redis SDK
, .net
中常用的有StackExchange.Redis
、FreeRedis
、csredis
、NewLife.Redis
、Service.Stack.Redis
,其中我經常使用的是StackExchange.Redis
和FreeRedis
整體來說效果還是不錯的。總結一下我們文章的主要內容
Redis命令
的格式Redis協定(RESP)
的主要格式以及如何解析RedisClient
類來演示相關概念lua
來簡單的演示RedisClient
類的使用 作為新時代的職場人,我樂在探究自己感興趣的領域,對未知的事物充滿好奇,並渴望深入瞭解。對於常用的核心技術,我不僅要求自己能夠熟練運用,更追求深入理解其實現原理。面對新的技術趨勢,我決不會視而不見,而是在熟悉更多常用技術棧的同時,努力深入掌握一些重要的知識。我堅信,學無止境,每一步的進步都帶來無比的喜悅與成就感。