眾所周知,非同步並行程式設計可以幫助程式更好地處理阻塞操作,比如網路 IO 操作或檔案 IO 操作,避免因等待這些操作完成而導致程式卡住的情況。雲端儲存檔案傳輸場景正好包含網路 IO 操作和檔案 IO 操作,比如業內相對著名的七牛雲端儲存,官方sdk的預設阻塞傳輸模式雖然差強人意,但未免有些循規蹈矩,不夠銳意創新。在全球同性交友網站Github上找了一圈,也沒有找到非同步版本,那麼本次我們來自己動手將同步阻塞版本改造為非同步非阻塞版本,並上傳至Python官方庫。
首先參見七牛雲官方介面檔案:https://developer.qiniu.com/kodo,新建qiniu_async.py檔案:
# @Author:Liu Yue (v3u.cn)
# @Software:Vscode
# @Time:2022/12/30
import base64
import hmac
import time
from hashlib import sha1
import json
import httpx
import aiofiles
class Qiniu:
def __init__(self, access_key, secret_key):
"""初始化"""
self.__checkKey(access_key, secret_key)
self.__access_key = access_key
self.__secret_key = secret_key.encode('utf-8')
def get_access_key(self):
return self.__access_key
def get_secret_key(self):
return self.__secret_key
def __token(self, data):
hashed = hmac.new(self.__secret_key,data.encode('utf-8'), sha1)
return self.urlsafe_base64_encode(hashed.digest())
def token(self, data):
return '{0}:{1}'.format(self.__access_key, self.__token(data))
def token_with_data(self, data):
data = self.urlsafe_base64_encode(data)
return '{0}:{1}:{2}'.format(
self.__access_key, self.__token(data), data)
def urlsafe_base64_encode(self,data):
if isinstance(data, str):
data = data.encode('utf-8')
ret = base64.urlsafe_b64encode(data)
data = ret.decode('utf-8')
return data
@staticmethod
def __checkKey(access_key, secret_key):
if not (access_key and secret_key):
raise ValueError('invalid key')
def upload_token(
self,
bucket,
key=None,
expires=3600,
policy=None,
strict_policy=True):
"""生成上傳憑證
Args:
bucket: 上傳的空間名
key: 上傳的檔名,預設為空
expires: 上傳憑證的過期時間,預設為3600s
policy: 上傳策略,預設為空
Returns:
上傳憑證
"""
if bucket is None or bucket == '':
raise ValueError('invalid bucket name')
scope = bucket
if key is not None:
scope = '{0}:{1}'.format(bucket, key)
args = dict(
scope=scope,
deadline=int(time.time()) + expires,
)
return self.__upload_token(args)
@staticmethod
def up_token_decode(up_token):
up_token_list = up_token.split(':')
ak = up_token_list[0]
sign = base64.urlsafe_b64decode(up_token_list[1])
decode_policy = base64.urlsafe_b64decode(up_token_list[2])
decode_policy = decode_policy.decode('utf-8')
dict_policy = json.loads(decode_policy)
return ak, sign, dict_policy
def __upload_token(self, policy):
data = json.dumps(policy, separators=(',', ':'))
return self.token_with_data(data)
@staticmethod
def __copy_policy(policy, to, strict_policy):
for k, v in policy.items():
if (not strict_policy) or k in _policy_fields:
to[k] = v
這裡有兩個很關鍵的非同步非阻塞三方庫,分別是httpx和aiofiles,對應處理網路IO和檔案IO阻塞問題:
pip3 install httpx
pip3 install aiofiles
隨後按照檔案流程通過加密方法獲取檔案上傳token,這裡無須進行非同步改造,因為並不涉及IO操作:
q = Qiniu(access_key,access_secret)
token = q.upload_token("空間名稱")
print(token)
程式返回:
➜ mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"
q06bq54Ps5JLfZyP8Ax-qvByMBdu8AoIVJpMco2m:8RjIo9a4CxHM3009DwjbMxDzlU8=:eyJzY29wZSI6ImFkLWgyMTEyIiwiZGVhZGxpbmUiOjE2NzIzNjg2NTd9
接著新增檔案流推播方法,先看官方原版邏輯:
def put_data(
up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,
fname=None, hostscache_dir=None, metadata=None):
"""上傳二進位制流到七牛
Args:
up_token: 上傳憑證
key: 上傳檔名
data: 上傳二進位制流
params: 自定義變數,規格參考 https://developer.qiniu.com/kodo/manual/vars#xvar
mime_type: 上傳資料的mimeType
check_crc: 是否校驗crc32
progress_handler: 上傳進度
hostscache_dir: host請求 快取檔案儲存位置
metadata: 後設資料
Returns:
一個dict變數,類似 {"hash": "<Hash string>", "key": "<Key string>"}
一個ResponseInfo物件
"""
final_data = b''
if hasattr(data, 'read'):
while True:
tmp_data = data.read(config._BLOCK_SIZE)
if len(tmp_data) == 0:
break
else:
final_data += tmp_data
else:
final_data = data
crc = crc32(final_data)
return _form_put(up_token, key, final_data, params, mime_type,
crc, hostscache_dir, progress_handler, fname, metadata=metadata)
def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None, progress_handler=None, file_name=None,
modify_time=None, keep_last_modified=False, metadata=None):
fields = {}
if params:
for k, v in params.items():
fields[k] = str(v)
if crc:
fields['crc32'] = crc
if key is not None:
fields['key'] = key
fields['token'] = up_token
if config.get_default('default_zone').up_host:
url = config.get_default('default_zone').up_host
else:
url = config.get_default('default_zone').get_up_host_by_token(up_token, hostscache_dir)
# name = key if key else file_name
fname = file_name
if not fname or not fname.strip():
fname = 'file_name'
# last modify time
if modify_time and keep_last_modified:
fields['x-qn-meta-!Last-Modified'] = rfc_from_timestamp(modify_time)
if metadata:
for k, v in metadata.items():
if k.startswith('x-qn-meta-'):
fields[k] = str(v)
r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
if r is None and info.need_retry():
if info.connect_failed:
if config.get_default('default_zone').up_host_backup:
url = config.get_default('default_zone').up_host_backup
else:
url = config.get_default('default_zone').get_up_host_backup_by_token(up_token, hostscache_dir)
if hasattr(data, 'read') is False:
pass
elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):
data.seek(0)
else:
return r, info
r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
return r, info
這裡官方使用兩個方法,先試用put_data方法將字串轉換為二進位制檔案流,隨後呼叫_form_put進行同步上傳操作,這裡_form_put這個私有方法是可複用的,既相容檔案流也相容檔案實體,寫法上非常值得我們借鑑,弄明白了官方原版的流程後,讓我們撰寫檔案流傳輸的非同步版本:
# 上傳檔案流
async def upload_data(self,up_token, key,data,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):
data.encode('utf-8')
fields = {}
if params:
for k, v in params.items():
fields[k] = str(v)
if key is not None:
fields['key'] = key
fields['token'] = up_token
fname = file_name
if not fname or not fname.strip():
fname = 'file_name'
async with httpx.AsyncClient() as client:
# 呼叫非同步使用await關鍵字
res = await client.post(url,data=fields,files={'file': (fname,data,mime_type)})
print(res.text)
這裡我們宣告非同步方法upload_data,通過encode直接轉換檔案流,並使用非同步httpx.AsyncClient()物件將檔案流推播到官網介面地址:up-z1.qiniup.com
隨後進行測試:
import asyncio
q = qiniu_async.Qiniu("accesskey","accesssecret")
token = q.upload_token("空間名稱")
#檔案流上傳
asyncio.run(q.upload_data(token,"3343.txt","123測試"))
程式返回:
➜ mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"
{"hash":"FtnQXAXft5AsOH1mrmXGaRzSt-95","key":"33434.txt"}
介面會返回檔案流的hash編碼,沒有問題。
接著檢視檔案上傳流程:
def put_file(up_token, key, file_path, params=None,
mime_type='application/octet-stream', check_crc=False,
progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None,
part_size=None, version=None, bucket_name=None, metadata=None):
"""上傳檔案到七牛
Args:
up_token: 上傳憑證
key: 上傳檔名
file_path: 上傳檔案的路徑
params: 自定義變數,規格參考 https://developer.qiniu.com/kodo/manual/vars#xvar
mime_type: 上傳資料的mimeType
check_crc: 是否校驗crc32
progress_handler: 上傳進度
upload_progress_recorder: 記錄上傳進度,用於斷點續傳
hostscache_dir: host請求 快取檔案儲存位置
version: 分片上傳版本 目前支援v1/v2版本 預設v1
part_size: 分片上傳v2必傳欄位 預設大小為4MB 分片大小範圍為1 MB - 1 GB
bucket_name: 分片上傳v2欄位 空間名稱
metadata: 後設資料資訊
Returns:
一個dict變數,類似 {"hash": "<Hash string>", "key": "<Key string>"}
一個ResponseInfo物件
"""
ret = {}
size = os.stat(file_path).st_size
with open(file_path, 'rb') as input_stream:
file_name = os.path.basename(file_path)
modify_time = int(os.path.getmtime(file_path))
if size > config.get_default('default_upload_threshold'):
ret, info = put_stream(up_token, key, input_stream, file_name, size, hostscache_dir, params,
mime_type, progress_handler,
upload_progress_recorder=upload_progress_recorder,
modify_time=modify_time, keep_last_modified=keep_last_modified,
part_size=part_size, version=version, bucket_name=bucket_name, metadata=metadata)
else:
crc = file_crc32(file_path)
ret, info = _form_put(up_token, key, input_stream, params, mime_type,
crc, hostscache_dir, progress_handler, file_name,
modify_time=modify_time, keep_last_modified=keep_last_modified, metadata=metadata)
return ret, info
這裡官方使用的是標準庫上下文管理器同步讀取檔案,改寫為非同步方法:
# 上傳檔案實體
async def upload_file(self,up_token,key,path,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):
async with aiofiles.open(path, mode='rb') as f:
contents = await f.read()
fields = {}
if params:
for k, v in params.items():
fields[k] = str(v)
if key is not None:
fields['key'] = key
fields['token'] = up_token
fname = file_name
if not fname or not fname.strip():
fname = 'file_name'
async with httpx.AsyncClient() as client:
# 呼叫非同步使用await關鍵字
res = await client.post(url,data=fields,files={'file': (fname,contents,mime_type)})
print(res.text)
通過aiofiles非同步讀取檔案後,在通過httpx.AsyncClient()進行非同步傳輸。
需要注意的是,這裡預設傳輸到up-z1.qiniup.com介面,如果是不同區域的雲端儲存伺服器,需要更改url引數的值,具體伺服器介面列表請參照官網檔案。
至此,檔案流和檔案非同步傳輸就改造好了。
為了方便廣大七牛雲使用者使用非同步傳輸版本庫,可以將qiniu-async上傳到Python官方庫,首先註冊成為Python官方庫的開發者:pypi.org/
隨後在專案根目錄下新建setup.py檔案:
import setuptools
import pathlib
here = pathlib.Path(__file__).parent.resolve()
long_description = (here / "README.md").read_text(encoding="utf-8")
setuptools.setup(
name="qiniu-async",
version="1.0.1",
author="LiuYue",
author_email="[email protected]",
description="qiniu_async python library",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/qiniu-async",
packages=setuptools.find_packages(),
license="Apache 2.0",
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3 :: Only",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
keywords="qiniu, qiniu_async, async",
py_modules=[
'qiniu_async'
],
install_requires=["aiofiles","httpx"],
)
這是安裝檔案,主要為了宣告該模組的名稱、作者、版本以及依賴庫。
隨後本地打包檔案:
python3 setup.py sdist
程式會根據setup.py檔案生成壓縮包:
➜ qiniu_async tree
.
├── README.md
├── dist
│ └── qiniu-async-1.0.1.tar.gz
├── https:
│ └── github.com
│ └── zcxey2911
│ └── qiniu-async.git
├── qiniu_async.egg-info
│ ├── PKG-INFO
│ ├── SOURCES.txt
│ ├── dependency_links.txt
│ ├── requires.txt
│ └── top_level.txt
├── qiniu_async.py
└── setup.py
接著安裝twine庫, 準備提交Python官網:
pip3 install twine
隨後在根目錄執行命令提交:
twine upload dist/*
在官網進行檢視:https://pypi.org/project/qiniu-async/
隨後本地就可以直接通過pip命令句進行安裝了:
pip install qiniu-async -i https://pypi.org/simple
非常方便。
雲端儲存,非同步加持,猛虎添翼,未敢擁鉢獨饗,除了通過pip安裝qiniu-async庫,也奉上Github專案地址:https://github.com/zcxey2911/qiniu-async ,與眾鄉親同饗。