Redis解決網路抖動問題

2023-07-22 18:00:55

Redis解決網路抖動問題

所謂網路抖動問題, 簡單來說就是防止使用者短暫的時間內對同一個介面多次點選存取

這裡利用的是redis鎖的原子性和with Statement上下文管理器實現, 另外該類還支援協程, 可使用async with 呼叫

1. 原始碼

FuncDefine.py

def clear_all_lock(PREFIX='lock'):
    keys = redis_operator.get_redis_keys_pattern(PREFIX + '*')

    for key in keys:
        if isinstance(key, bytes):
            kwl_py_write_log(key.decode(encoding='utf-8'), msgid='del_redis_key')
            redis_operator.delete_redis_key(key.decode(encoding='utf-8'), 1)


def unlock(key):
    redis_operator.delete_redis_key(key, 1)


class RedisLock:
    DEFAULT_VALUE = 1
    PREFIX = 'lock'

    def __init__(self, key, lock_time=300):
        """
        初始化redis鎖
        :param key:  關鍵字
        :param lock_time: 上鎖時間 5min
        """
        self._key = RedisLock.PREFIX + key
        self.lock_time = lock_time
        self.hold_lock = False

    @property
    def key(self):
        return self._key

    @key.setter
    def key(self, key):
        self._key = RedisLock.PREFIX + key

    def __enter__(self):
        self.hold_lock = self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.hold_lock:
            self.release()
        return False

    async def __aenter__(self):
        self.hold_lock = await self.acquire_cas_lock()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.hold_lock:
            self.release()
        return False

    async def acquire_cas_lock(self, lock_time=60):
        try:
            return await asyncio.wait_for(self.acquire_lock_until_succ(), lock_time)
        except asyncio.TimeoutError as e:
            return False

    async def acquire_lock_until_succ(self):

        while redis_operator.set_redis_key_ex_nx(self.key, self.DEFAULT_VALUE, self.lock_time) is not True:
            # redis return is None or other
            await asyncio.sleep(0.01)
        return True

    def acquire(self):
        """
        設定redis鎖
        :param key:
        :param lock_time:
        :return:
        """
        try:
            return redis_operator.set_redis_key_ex_nx(self.key, self.DEFAULT_VALUE, self.lock_time) is True
        except Exception:
            return False

    def release(self):
        redis_operator.delete_redis_key(self.key, 1)

redis_operator.py

import redis
from redisConfig import *

# ------------------------------------------------------------------------------------------------------
# 主從redis,個數一一對應
g_main_redis_pool_list = []
g_slave_redis_pool_list = []
g_main_redis_is_ok = []  # 主redis是否可用True為主ok
g_slave_redis_is_ok = []  # 從redis是否可用

for each_redis in g_main_redis_server:
    redis_pool = redis.ConnectionPool(host=each_redis[0], port=each_redis[1], password=each_redis[2], socket_timeout=8,
                                      socket_connect_timeout=5)
    g_main_redis_pool_list.append(redis_pool)
    g_main_redis_is_ok.append(True)

for each_redis in g_slave_redis_server:
    redis_pool = redis.ConnectionPool(host=each_redis[0], port=each_redis[1], password=each_redis[2], socket_timeout=8,
                                      socket_connect_timeout=5)
    g_slave_redis_pool_list.append(redis_pool)
    g_slave_redis_is_ok.append(True)


def get_redis_by_key(strkey, nums):
    return (ord(strkey[0]) + ord(strkey[-1])) % nums


# 從redis取
def get_redis_key(key):
    # 根據key來分庫
    index = get_redis_by_key(key, len(g_main_redis_pool_list))
    if g_main_redis_is_ok[index]:
        # 主ok
        try:
            return redis.Redis(connection_pool=g_main_redis_pool_list[index]).get(key)
        except Exception:
            # 主標記為掛
            g_main_redis_is_ok[index] = False
            # 主掛了試試從能不能用
            g_slave_redis_is_ok[index] = True
    if g_slave_redis_is_ok[index]:
        # 從ok
        try:
            return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).get(key)
        except Exception as e:
            # 從標記為掛
            g_slave_redis_is_ok[index] = False
            # 從也掛了下回只能嘗試使用主
            g_main_redis_is_ok[index] = True
            # 丟擲異常
            raise Exception(repr(e))
    # 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
    g_main_redis_is_ok[index] = Trueget_redis_by_key
    raise Exception('內部錯誤,get_redis_key執行異常')


# redis存值且設定生命週期
def set_redis_key_ex(key, value, expire):
    # 根據key來分庫
    index = get_redis_by_key(key, len(g_main_redis_pool_list))
    if g_main_redis_is_ok[index]:
        # 主ok
        try:
            if expire == 0:
                return redis.Redis(connection_pool=g_main_redis_pool_list[index]).set(key, value)
            return redis.Redis(connection_pool=g_main_redis_pool_list[index]).setex(key, value, expire)
        except Exception:
            # 主標記為掛
            g_main_redis_is_ok[index] = False
            # 主掛了試試從能不能用
            g_slave_redis_is_ok[index] = True
    if g_slave_redis_is_ok[index]:
        # 從ok
        try:
            if expire == 0:
                return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).set(key, value)
            return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).setex(key, value, expire)
        except Exception as e:
            # 從標記為掛
            g_slave_redis_is_ok[index] = False
            # 從也掛了下回只能嘗試使用主
            g_main_redis_is_ok[index] = True
            # 丟擲異常
            raise Exception(repr(e))
    # 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
    g_main_redis_is_ok[index] = True
    raise Exception('內部錯誤,set_redis_key_ex執行異常')


# redis存值且設定生命週期
def expire_redis_key(key, expire):
    # 根據key來分庫
    index = get_redis_by_key(key, len(g_main_redis_pool_list))
    if g_main_redis_is_ok[index]:
        # 主ok
        try:
            if expire == 0:
                return 0
            return redis.Redis(connection_pool=g_main_redis_pool_list[index]).expire(key, expire)
        except Exception:
            # 主標記為掛
            g_main_redis_is_ok[index] = False
            # 主掛了試試從能不能用
            g_slave_redis_is_ok[index] = True
    if g_slave_redis_is_ok[index]:
        # 從ok
        try:
            if expire == 0:
                return 0
            return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).expire(key, expire)
        except Exception as e:
            # 從標記為掛
            g_slave_redis_is_ok[index] = False
            # 從也掛了下回只能嘗試使用主
            g_main_redis_is_ok[index] = True
            # 丟擲異常
            raise Exception(repr(e))
    # 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
    g_main_redis_is_ok[index] = True
    raise Exception('內部錯誤,expire_redis_key執行異常')


# redis刪除key
def delete_redis_key(key, expire):
    # 根據key來分庫
    index = get_redis_by_key(key, len(g_main_redis_pool_list))
    if g_main_redis_is_ok[index]:
        # 主ok
        try:
            if expire == 0:
                return 0
            return redis.Redis(connection_pool=g_main_redis_pool_list[index]).delete(key)
        except Exception:
            # 主標記為掛
            g_main_redis_is_ok[index] = False
            # 主掛了試試從能不能用
            g_slave_redis_is_ok[index] = True
    if g_slave_redis_is_ok[index]:
        # 從ok
        try:
            if expire == 0:
                return 0
            return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).delete(key)
        except Exception as e:
            # 從標記為掛
            g_slave_redis_is_ok[index] = False
            # 從也掛了下回只能嘗試使用主
            g_main_redis_is_ok[index] = True
            # 丟擲異常
            raise Exception(repr(e))
    # 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
    g_main_redis_is_ok[index] = True
    raise Exception('內部錯誤,delete_redis_key執行異常')


def set_redis_key_ex_nx(key, value, expire):
    """如果有鍵值則不設定"""
    # 根據key來分庫
    index = get_redis_by_key(key, len(g_main_redis_pool_list))
    if g_main_redis_is_ok[index]:
        # 主ok
        try:
            if expire == 0:
                return 0
            return redis.Redis(connection_pool=g_main_redis_pool_list[index]).set(key, value, ex=expire, nx=True)
        except Exception:
            # 主標記為掛
            g_main_redis_is_ok[index] = False
            # 主掛了試試從能不能用
            g_slave_redis_is_ok[index] = True

    if g_slave_redis_is_ok[index]:
        # 從ok
        try:
            if expire == 0:
                return 0
            return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).set(key, value, ex=expire, nx=True)
        except Exception as e:
            # 從標記為掛
            g_slave_redis_is_ok[index] = False
            # 從也掛了下回只能嘗試使用主
            g_main_redis_is_ok[index] = True
            # 丟擲異常
            raise Exception(repr(e))
    # 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
    g_main_redis_is_ok[index] = True
    raise Exception('內部錯誤,set_redis_key_nx_執行異常')


def get_redis_keys_pattern(key_pattern):
    from builtins import enumerate

    key_set = set()
    # 主庫找
    for index, is_ok in enumerate(g_main_redis_is_ok):
        if is_ok:
            key_set.update(redis.Redis(connection_pool=g_main_redis_pool_list[index]).keys(key_pattern))
    # 從庫找
    for index, is_ok in enumerate(g_slave_redis_is_ok):
        if is_ok:
            key_set.update(redis.Redis(connection_pool=g_slave_redis_pool_list[index]).keys(key_pattern))

    return key_set


if __name__ == "__main__":
    # set_redis_key_ex('ab','a',10)
    print(get_redis_key('ab').decode())

2. 使用方法

import FuncDefine
	with FuncDefine.RedisLock(rediskey) as lock:
		if not lock.hold_lock:
			return response(3, '商品新增中,請稍後~', '', [])

3. 原始碼分析

整體來看也就是介面存取過來的時候, 設定一個redis_key(nx=True, ex=300), 這樣在五分鐘之內就可以避免重複點選的情況

  1. 初始化redis, 上下文管理器會觸發__enter__()方法, 從而呼叫self.acquire()

  1. 設定redis的鍵, 如果不加nx=True, redis的set會直接覆蓋之前key的值, 這裡還存在一個主從redis, 感興趣可以看看原始碼

  1. 當執行完with中的程式碼塊, 會觸發__exit__()函數, 呼叫函數刪除當前redis的key對應的值

  1. 剩下的一些函數都是封裝的一些通用方法, 比如檢視當前key值