所謂網路抖動問題, 簡單來說就是防止使用者短暫的時間內對同一個介面多次點選存取
這裡利用的是redis鎖的原子性和with Statement上下文管理器實現, 另外該類還支援協程, 可使用async with
呼叫
FuncDefine.py
def clear_all_lock(PREFIX='lock'):
keys = redis_operator.get_redis_keys_pattern(PREFIX + '*')
for key in keys:
if isinstance(key, bytes):
kwl_py_write_log(key.decode(encoding='utf-8'), msgid='del_redis_key')
redis_operator.delete_redis_key(key.decode(encoding='utf-8'), 1)
def unlock(key):
redis_operator.delete_redis_key(key, 1)
class RedisLock:
DEFAULT_VALUE = 1
PREFIX = 'lock'
def __init__(self, key, lock_time=300):
"""
初始化redis鎖
:param key: 關鍵字
:param lock_time: 上鎖時間 5min
"""
self._key = RedisLock.PREFIX + key
self.lock_time = lock_time
self.hold_lock = False
@property
def key(self):
return self._key
@key.setter
def key(self, key):
self._key = RedisLock.PREFIX + key
def __enter__(self):
self.hold_lock = self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.hold_lock:
self.release()
return False
async def __aenter__(self):
self.hold_lock = await self.acquire_cas_lock()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.hold_lock:
self.release()
return False
async def acquire_cas_lock(self, lock_time=60):
try:
return await asyncio.wait_for(self.acquire_lock_until_succ(), lock_time)
except asyncio.TimeoutError as e:
return False
async def acquire_lock_until_succ(self):
while redis_operator.set_redis_key_ex_nx(self.key, self.DEFAULT_VALUE, self.lock_time) is not True:
# redis return is None or other
await asyncio.sleep(0.01)
return True
def acquire(self):
"""
設定redis鎖
:param key:
:param lock_time:
:return:
"""
try:
return redis_operator.set_redis_key_ex_nx(self.key, self.DEFAULT_VALUE, self.lock_time) is True
except Exception:
return False
def release(self):
redis_operator.delete_redis_key(self.key, 1)
redis_operator.py
import redis
from redisConfig import *
# ------------------------------------------------------------------------------------------------------
# 主從redis,個數一一對應
g_main_redis_pool_list = []
g_slave_redis_pool_list = []
g_main_redis_is_ok = [] # 主redis是否可用True為主ok
g_slave_redis_is_ok = [] # 從redis是否可用
for each_redis in g_main_redis_server:
redis_pool = redis.ConnectionPool(host=each_redis[0], port=each_redis[1], password=each_redis[2], socket_timeout=8,
socket_connect_timeout=5)
g_main_redis_pool_list.append(redis_pool)
g_main_redis_is_ok.append(True)
for each_redis in g_slave_redis_server:
redis_pool = redis.ConnectionPool(host=each_redis[0], port=each_redis[1], password=each_redis[2], socket_timeout=8,
socket_connect_timeout=5)
g_slave_redis_pool_list.append(redis_pool)
g_slave_redis_is_ok.append(True)
def get_redis_by_key(strkey, nums):
return (ord(strkey[0]) + ord(strkey[-1])) % nums
# 從redis取
def get_redis_key(key):
# 根據key來分庫
index = get_redis_by_key(key, len(g_main_redis_pool_list))
if g_main_redis_is_ok[index]:
# 主ok
try:
return redis.Redis(connection_pool=g_main_redis_pool_list[index]).get(key)
except Exception:
# 主標記為掛
g_main_redis_is_ok[index] = False
# 主掛了試試從能不能用
g_slave_redis_is_ok[index] = True
if g_slave_redis_is_ok[index]:
# 從ok
try:
return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).get(key)
except Exception as e:
# 從標記為掛
g_slave_redis_is_ok[index] = False
# 從也掛了下回只能嘗試使用主
g_main_redis_is_ok[index] = True
# 丟擲異常
raise Exception(repr(e))
# 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
g_main_redis_is_ok[index] = Trueget_redis_by_key
raise Exception('內部錯誤,get_redis_key執行異常')
# redis存值且設定生命週期
def set_redis_key_ex(key, value, expire):
# 根據key來分庫
index = get_redis_by_key(key, len(g_main_redis_pool_list))
if g_main_redis_is_ok[index]:
# 主ok
try:
if expire == 0:
return redis.Redis(connection_pool=g_main_redis_pool_list[index]).set(key, value)
return redis.Redis(connection_pool=g_main_redis_pool_list[index]).setex(key, value, expire)
except Exception:
# 主標記為掛
g_main_redis_is_ok[index] = False
# 主掛了試試從能不能用
g_slave_redis_is_ok[index] = True
if g_slave_redis_is_ok[index]:
# 從ok
try:
if expire == 0:
return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).set(key, value)
return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).setex(key, value, expire)
except Exception as e:
# 從標記為掛
g_slave_redis_is_ok[index] = False
# 從也掛了下回只能嘗試使用主
g_main_redis_is_ok[index] = True
# 丟擲異常
raise Exception(repr(e))
# 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
g_main_redis_is_ok[index] = True
raise Exception('內部錯誤,set_redis_key_ex執行異常')
# redis存值且設定生命週期
def expire_redis_key(key, expire):
# 根據key來分庫
index = get_redis_by_key(key, len(g_main_redis_pool_list))
if g_main_redis_is_ok[index]:
# 主ok
try:
if expire == 0:
return 0
return redis.Redis(connection_pool=g_main_redis_pool_list[index]).expire(key, expire)
except Exception:
# 主標記為掛
g_main_redis_is_ok[index] = False
# 主掛了試試從能不能用
g_slave_redis_is_ok[index] = True
if g_slave_redis_is_ok[index]:
# 從ok
try:
if expire == 0:
return 0
return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).expire(key, expire)
except Exception as e:
# 從標記為掛
g_slave_redis_is_ok[index] = False
# 從也掛了下回只能嘗試使用主
g_main_redis_is_ok[index] = True
# 丟擲異常
raise Exception(repr(e))
# 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
g_main_redis_is_ok[index] = True
raise Exception('內部錯誤,expire_redis_key執行異常')
# redis刪除key
def delete_redis_key(key, expire):
# 根據key來分庫
index = get_redis_by_key(key, len(g_main_redis_pool_list))
if g_main_redis_is_ok[index]:
# 主ok
try:
if expire == 0:
return 0
return redis.Redis(connection_pool=g_main_redis_pool_list[index]).delete(key)
except Exception:
# 主標記為掛
g_main_redis_is_ok[index] = False
# 主掛了試試從能不能用
g_slave_redis_is_ok[index] = True
if g_slave_redis_is_ok[index]:
# 從ok
try:
if expire == 0:
return 0
return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).delete(key)
except Exception as e:
# 從標記為掛
g_slave_redis_is_ok[index] = False
# 從也掛了下回只能嘗試使用主
g_main_redis_is_ok[index] = True
# 丟擲異常
raise Exception(repr(e))
# 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
g_main_redis_is_ok[index] = True
raise Exception('內部錯誤,delete_redis_key執行異常')
def set_redis_key_ex_nx(key, value, expire):
"""如果有鍵值則不設定"""
# 根據key來分庫
index = get_redis_by_key(key, len(g_main_redis_pool_list))
if g_main_redis_is_ok[index]:
# 主ok
try:
if expire == 0:
return 0
return redis.Redis(connection_pool=g_main_redis_pool_list[index]).set(key, value, ex=expire, nx=True)
except Exception:
# 主標記為掛
g_main_redis_is_ok[index] = False
# 主掛了試試從能不能用
g_slave_redis_is_ok[index] = True
if g_slave_redis_is_ok[index]:
# 從ok
try:
if expire == 0:
return 0
return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).set(key, value, ex=expire, nx=True)
except Exception as e:
# 從標記為掛
g_slave_redis_is_ok[index] = False
# 從也掛了下回只能嘗試使用主
g_main_redis_is_ok[index] = True
# 丟擲異常
raise Exception(repr(e))
# 按理不可能出現這種情況,主從皆False,全掛的情況也會至少開啟一個
g_main_redis_is_ok[index] = True
raise Exception('內部錯誤,set_redis_key_nx_執行異常')
def get_redis_keys_pattern(key_pattern):
from builtins import enumerate
key_set = set()
# 主庫找
for index, is_ok in enumerate(g_main_redis_is_ok):
if is_ok:
key_set.update(redis.Redis(connection_pool=g_main_redis_pool_list[index]).keys(key_pattern))
# 從庫找
for index, is_ok in enumerate(g_slave_redis_is_ok):
if is_ok:
key_set.update(redis.Redis(connection_pool=g_slave_redis_pool_list[index]).keys(key_pattern))
return key_set
if __name__ == "__main__":
# set_redis_key_ex('ab','a',10)
print(get_redis_key('ab').decode())
import FuncDefine
with FuncDefine.RedisLock(rediskey) as lock:
if not lock.hold_lock:
return response(3, '商品新增中,請稍後~', '', [])
整體來看也就是介面存取過來的時候, 設定一個redis_key(nx=True, ex=300), 這樣在五分鐘之內就可以避免重複點選的情況
__enter__()
方法, 從而呼叫self.acquire()__exit__()
函數, 呼叫函數刪除當前redis的key對應的值本文來自部落格園,作者:{Max},僅供學習和參考