如何在 PyQt 中實現非同步資料庫請求

2023-12-07 21:01:10

需求

開發軟體的時候不可避免要和資料庫發生互動,但是有些 SQL 請求非常耗時,如果在主執行緒中傳送請求,可能會造成介面卡頓。這篇部落格將會介紹一種讓資料庫請求變得和前端的 ajax 請求一樣簡單,且不會阻塞介面的非同步請求方法。

實現過程

在實現非同步請求之前,需要先明確一下函數簽名:

def sqlRequest(
    service: str, 
    method: str, 
    slot, 
    params: dict = None
)

各個引數的解釋如下:

  • service: 業務名
  • method: 介面名
  • slot: 拿到資料後呼叫的回撥函數
  • params: 請求引數

總體流程如下圖所示,包括子介面傳送請求、資料庫執行緒處理請求、主介面呼叫回撥函數來消費響應結果三個步驟。

訊號匯流排

在 Qt 中,子執行緒無法直接更新主介面,只能傳送訊號通知主執行緒,然後在主執行緒中更新介面。在之前的部落格《如何在 pyqt 中實現全域性事件匯流排》介紹了訊號匯流排的使用,通過引入訊號匯流排,可實現任意層級的元件之間的通訊。

本文的訊號匯流排只含有兩個訊號,一個用來請求資料,一個用來消費資料:

class SignalBus(QObject):
    """ Signal bus """
    fetchDataSig = Signal(SqlRequest)    # 請求資料訊號
    dataFetched = Signal(SqlResponse)    # 響應資料訊號

    
signalBus = SignalBus()
    
    
class SqlRequest:
    """ Sql request """

    def __init__(self, service: str, method: str, slot=None, params: dict = None):
        self.service = service
        self.method = method
        self.slot = slot
        self.params = params or {}


class SqlResponse:
    """ Sql response """

    def __init__(self, data, slot):
        self.slot = slot
        self.data = data

傳送請求

子介面中通過呼叫 sqlRequest() 函數來發起非同步 SQL 請求,該函數只是將引數封裝為 SqlRequest 物件,然後通過 signalBusfetchDataSig 訊號傳送給資料庫子執行緒:

def sqlRequest(service: str, method: str, slot=None, params: dict = None):
    """ query sql from database """
    request = SqlRequest(service, method, slot, params)
    signalBus.fetchDataSig.emit(request)

比如下圖中商品型別下拉框的資料就來自於資料庫:

在元件 LicenseCard 中使用下述程式碼就能完成資料的請求和消費(元件庫參見 https://qfluentwidgets.com/zh/ ):

from qfluentwidgets import HeaderCardWidget, ComboBox

class LicenseCard(HeaderCardWidget):
    
    def __init__(self, parent=None):
        super().__init__("許可證", parent)
        self.goodsComboBox = ComboBox(self)
        
        # 請求商品資訊
        sqlRequest("goodsService", "listAll", lambda i: self.onGoodsFetched(i))

    def onGoodsFetched(self, goods: List[Goods]):
        """ 將商品資訊新增到下拉框中 """
        for good in goods:
            self.goodsComboBox.addItem(good.name, userData=good)

處理請求

子執行緒 DatabaseThread 中維護著一個請求佇列 tasks,每當收到訊號匯流排的 fetchDataSig 訊號時,就會使用反射機制將請求中攜帶的 servicemethod 字串轉換為資料庫業務類的方法指標,並將這個指標新增到佇列中等待呼叫。呼叫方法返回的資料會被封裝為 SqlResponse 物件,接著通過訊號匯流排傳送給主介面。

class DatabaseThread(QThread):
    """ Database thread """

    def __init__(self, db: QSqlDatabase = None, parent=None):
        super().__init__(parent=parent)
        self.database = Database(db, self)
        self.tasks = deque()

        # 處理請求訊號
        signalBus.fetchDataSig.connect(self.onFetchData)

    def run(self):
        """ 處理請求 """
        while self.tasks:
            task, request = self.tasks.popleft()
            result = task(**request.params)
            signalBus.dataFetched.emit(SqlResponse(result, request.slot))

    def onFetchData(self, request: SqlRequest):
        """ 將請求新增到佇列中 """
        service = getattr(self.database, request.service)
        task = getattr(service, request.method)
        self.tasks.append((task, request))

        if not self.isRunning():
            self.start()
                

class Database(QObject):
    """ Database """

    def __init__(self, db: QSqlDatabase = None, parent=None):
        super().__init__(parent=parent)
        self.orderService = OrderService(db)
        self.userService = UserService(db)
        self.goodsService = GoodsService(db)

處理響應結果

主介面中只需將訊號匯流排的 dataFetched 訊號連線槽函數,然後在槽函數中對取出 response 物件中的資料,並呼叫回撥函數來消費資料即可:

from qfluentwidgets import MSFluentWindow

class MainWindow(MSFluentWindow):
    """ 主介面 """
    
    def __init__(self):
        super().__init__()
        
        # 處理響應結果
        signalBus.dataFetched.connect(self.onDataFetched)

    def onDataFetched(self, response: SqlResponse):
        if response.slot:
            response.slot(response.data)

總結

在這篇部落格中我們使用子執行緒和訊號匯流排完成了非同步資料庫請求操作,介面所使用的元件全部來自於 https://qfluentwidgets.com/zh/ ,以上~~