SignalR+Hangfire 實現後臺任務佇列和實時通訊

2023-06-07 06:00:48

SignalR+Hangfire 實現後臺任務佇列和實時通訊

1.簡介:

SignalR是一個.NET的開源框架,SignalR可使用Web Socket, Server Sent Events 和 Long Polling作為底層傳輸方式實現伺服器端和使用者端的實時資料互動。

Hangfire是一個.NET的開源後臺任務框架 提供統一的程式設計模型,以可靠的方式處理後臺任務

2.目的:

通過SignalR+Hangfire,我們可以實現一些需要較長時間處理的任務,並在完成及時的通知前端處理結果。

3.以下是我使用SignalR+Hangfire的開發需求:

在net6 webapi的情況下,前端是vue+ts,我現在有個需要就是,我寫了一個介面,是對接stable diffusion webui 文生圖的介面,前端第一個人請求,返回圖沒有問題,
但是,此時在生成圖的過程中,第二個人請求,我希望加入到一個佇列或者別的方式 ,把這個請求放著,我處理完第一個請求之後繼續處理第二個,並且告訴使用者,前面有多少個任務需要等待?

我的開發環境,後端是.net7 前端vue3.0,下面是對應安裝和使用教學:

1.Hangfire使用

1.安裝nuget包

由於我使用的mysql,對應包為Hangfire.MySqlStorage,大家根據自己的資料庫選擇安裝對應的包

<PackageReference Include="Hangfire" Version="1.8.2" />
<PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" />

2.新增Hangfire設定

Hangfire的資料是存在資料庫中的,所以在新增設定時候要使用對應的資料庫連線字串。同時,在UseHangfireServer時,我使用了自定義的佇列名稱,並將同時執行的任務數設定為1,以實現任務佇列中的任務唯一,且任務依次執行。

在program.cs中新增以下設定

1.新增Hangfire

程式碼內容:

var connectionString = configuration.GetValue<string>("ConnStr");//資料庫連線設定
// Add Hangfire services.
services.AddHangfire(config =>
{
    config.UseStorage(new MySqlStorage(connectionString, new MySqlStorageOptions
    {
        TablesPrefix = "hangfire_", // 指定表字首
        PrepareSchemaIfNecessary = true // 允許安裝 MySQL 表格(如果不存在的話)
        // 其他儲存選項
    }));
});

2.應用Hangfire

程式碼內容:

// Use Hangfire server and dashboard.
app.UseHangfireServer(new BackgroundJobServerOptions
{
    Queues = new[] { "default", "img-queue" },
    WorkerCount = 1
});
app.UseHangfireDashboard();// 使用 Hangfire 控制面板

3.資料庫設定

設定完成,在使用時,資料庫會生成Hangfire的工作表,如下:

4.Hangfire 控制面板

對應Hangfire 控制面板為 /hangfire

http://localhost:5122/hangfire

1.儀表盤

2.佇列

5.程式碼中的應用

1.發起一個後臺任務

//新增後臺任務
BackgroundJob.Enqueue(() => BackServiceCreateImg(request));

2.後臺任務方法

/// <summary>
/// 後臺任務生成圖片(DisableConcurrentExecution 設定超時時間 Queue設定任務型別)
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
[DisableConcurrentExecution(timeoutInSeconds: 180)]
[Queue("img-queue")]
public async Task BackServiceCreateImg(GraphGenerationRequest request)
{
    //...程式碼邏輯省略
}

3.查詢佇列等待任務數

var queueLength = JobStorage.Current.GetMonitoringApi()
                            .EnqueuedCount("img-queue");//指定的佇列型別的佇列等待任務數

2.SignalR使用

1.後端SignalR使用

由於我使用的.net7,微軟自帶SignalR,我們使用時只需要新增參照

using Microsoft.AspNetCore.SignalR;

1.新增SignalR設定

在program.cs中新增以下設定

1.新增SignalR

程式碼內容:

// SignalR
services.AddSignalR();

2.設定SignalR hub

程式碼內容:

// SignalR hub
app.MapHub<GraphGenerationHub>("/graphhub");

2.建立SignalR hub類

using Hangfire;
using Microsoft.AspNetCore.Cors;
using Microsoft.AspNetCore.SignalR;

namespace ChatGptWebApi.Hubs
{
    [EnableCors("MyPolicy")]
    public class GraphGenerationHub : Hub
    {

        public GraphGenerationHub()
        {
        }

        public long GetWaitingCount()
        {
            return  JobStorage.Current.GetMonitoringApi()
                .EnqueuedCount("img-queue");
        }
    }
}

3.程式碼中的應用

1.依賴注入

通過依賴注入,在要使用的類中注入

private readonly IHubContext<GraphGenerationHub> _hubContext;

4.傳送訊息

向全體傳送

_hubContext.Clients.All.SendAsync("updateWaitingCount", "訊息內容.....");

向指定使用者端傳送

_hubContext.Clients.Client(request.ConnectionId).SendAsync("updateImgUrl", $"生成圖片失敗:{ex.Message}");

2.前端SignalR使用

前端我用的是VUE+TS

1.安裝SignalR包

通過命令使用 pnpm 安裝 @microsoft/signalr

pnpm install @microsoft/signalr

2.頁面中參照@microsoft/signalr

import * as signalR from "@microsoft/signalr";

3.建立一個useSignalR.ts

建立一個useSignalR.ts來專門處理SignalR訊息,然後在需要用到的頁面中參照即可。

程式碼內容:

import { onUnmounted, ref } from 'vue';
import { useMessage } from 'naive-ui'
import { HubConnectionBuilder, HubConnection } from '@microsoft/signalr';

export  function useSignalR(
  hubUrl: string,
  hubName: string
) {
  const connection = ref<HubConnection | null>(null);
  const waitingCount = ref(0);
  const imgUrl = ref([]);
  const ms = useMessage();
  const start = async () => {
    if (connection.value && connection.value.state === 'Connected') return;
    connection.value = getConnection(hubUrl);
    if (connection.value) {
     // 連線 SignalR
     connection.value.start()
     .then(() => {
       console.log('SignalR Connected.');
       // 呼叫 GraphGenerationHub 的 GetWaitingCount 方法獲取佇列等待數
       connection.value?.invoke('GetWaitingCount')
         .then(count => {
           console.log('Waiting Count:', count);
           waitingCount.value = count;
         });
       // 註冊 signalR 接收方法
       connection.value?.on('updateWaitingCount', count => {
         console.log('Waiting Count:', count);
         waitingCount.value = count;
       });
       connection.value?.on('updateImgUrl', newImgUrl => {
        console.log('Waiting imgUrl:', newImgUrl);
        if(typeof newImgUrl === 'string'){
          ms.error(newImgUrl);
        }else{
          ms.success('圖片生成成功。');
        imgUrl.value = newImgUrl;
        }
      });
     })
     .catch(error => {
       console.log('SignalR Connection Error:', error);
     });
    }
  };
  

  const stop = () => {
    connection.value!.stop();
    connection.value = null;
  };

  const getConnection = (
    hubUrl: string
  ): HubConnection => {
    return new HubConnectionBuilder()
      .withUrl(hubUrl)
      .withAutomaticReconnect().build();
  };

  start();

  onUnmounted(() => {
    if (connection.value?.state === 'Connected') connection.value!.stop();
  });

  return {
    connection,
    waitingCount,
    imgUrl,
    start,
    stop
  };
}

4.頁面中的使用

在需要使用signalR的頁面參照useSignalR

 import {useSignalR} from '@/views/chat/hooks/useSignalR';
setup() {
//signalR
const { waitingCount,connection,imgUrl } = useSignalR(apiBaseUrl+'/graphhub');
}

3.案例:SignalR+Hangfire+StableDiffusionAPI 生成圖片

Hangfire實現後臺呼叫StableDiffusion web介面,然後通過SignalR將結果返回給前端。這樣,對StableDiffusion web的效能要求很低。不會因為生成圖片慢,導致http請求超時的情況。大大改善了前後端互動。

1.前端建立SignalR

入上述頁面中使用介紹的一樣,當新增了

const { waitingCount,connection,imgUrl } = useSignalR(apiBaseUrl+'/graphhub');

開啟對應頁面時,就建立了SignalR的連線了。

2.前端發起請求

前端的提交按鈕對應的方法,使用的是axios傳送http請求生成圖片。

程式碼如下:

const submit = async () => {
        const params = {
          Prompt: description.value,
          connectionId:connection.value?.connectionId //SignalR的使用者端連線ID
        };
      try {
      //signalR
      const response = await axios.post(apiUrl+'/GenerateGraph', params);
      if(response.data.status ==='Fail'){
        ms.error(response.data.message ?? 'error')
      return
      }
      usedCount.value=response.data.data;
      ms.success(response.data.message);

    } catch (error) {
      ms.error('報錯拉!:'+error);
    }
    console.log("提交的引數:", params); // 在控制檯輸出提交的引數
  };

3.後端介面和實現

後端介面和實現方法完成定時任務的發起和signalR的訊息推播

後端介面如下:

/// <summary>
/// signalR+hangfire生成圖片
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
[HttpPost]
public async Task<ApiResult<int?>> GenerateGraph(GraphGenerationRequest request)
{
    var res=await _iGptImage.GenerateGraph(request);
    return res;
}

方法實現:

/// <summary>
/// 生成圖片,返回佇列資訊和剩餘次數
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public async Task<ApiResult<int?>> GenerateGraph(Form.GraphGenerationRequest request)
{
    //新增後臺任務
    BackgroundJob.Enqueue(() => BackServiceCreateImg(request));
    string message = await SendWaitingCount("img-queue");
    return new ApiResult<int?>(HttpResultTypeEnum.Success, count - 1, message);
}
 /// <summary>
/// 推播佇列的等待資訊
/// </summary>
/// <param name="enqueue">任務型別</param>
/// <returns></returns>
private async Task<string> SendWaitingCount(string enqueue)
{
    var queueLength = JobStorage.Current.GetMonitoringApi()
        .EnqueuedCount(enqueue);
    string message = $"任務已提交,您前面還有 {queueLength} 個任務正在等待。";
    await _hubContext.Clients.All.SendAsync("updateWaitingCount", queueLength);
    return message;
}

4.案例成果

案例地址(AI聊天+圖片生成):https://ai.terramours.site/

閱讀如遇樣式問題,請前往個人部落格瀏覽: https://www.raokun.top
擁抱ChatGPT:https://ai.terramours.site
開源專案地址:https://github.com/firstsaofan/TerraMours