造輪子之訊息實時推播

2023-10-13 18:01:36

前面我們的EventBus已經弄好了,那麼接下來通過EventBus來實現我們的訊息推播就是自然而然的事情了。
說到訊息推播,很多人肯定會想到Websocket,既然我們使用Asp.net core,那麼SignalR肯定是我們的首選。
接下來就用SignalR來實現我們的訊息實時推播。

NotificationHub

首選我們需要建立一個Hub,用於連線SignalR。
新增NotificationHub類繼承SignalR.Hub

using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Localization;
using Wheel.Notifications;

namespace Wheel.Hubs
{
    public class NotificationHub : Hub
    {
        protected IStringLocalizer L;

        public NotificationHub(IStringLocalizerFactory localizerFactory)
        {
            L = localizerFactory.Create(null);
        }

        public override async Task OnConnectedAsync()
        {
            if (Context.UserIdentifier != null)
            {
                var wellcome = new NotificationData(NotificationType.WellCome)
                    .WithData("name", Context.User!.Identity!.Name!)
                    .WithData("message", L["Hello"].Value);
                await Clients.Caller.SendAsync("Notification", wellcome);
            }
        }
    }
}

這裡重寫OnConnectedAsync,當用戶授權連線之後,立馬推播一個Hello的訊息。

約定訊息通知結構

為了方便並且統一結構,我們最好約定一組通知格式,方便使用者端處理訊息。
建立一個NotificationData類:

namespace Wheel.Notifications
{
    public class NotificationData
    {
        public NotificationData(NotificationType type)
        {
            Type = type;
        }

        public NotificationType Type { get; set; }

        public IDictionary<string, object> Data { get; set; } = new Dictionary<string, object>();

        public NotificationData WithData(string name, object value) 
        {
            Data.Add(name, value);
            return this;
        }
    }
    public enum NotificationType
    {
        WellCome = 0,
        Info = 1,
        Warn = 2,
        Error = 3
    }
}

NotificationData包含訊息通知型別Type,以及訊息資料Data。

自定義UserIdProvider

有時候我們可以能需要自定義使用者表示,那麼就需要實現一個自定義的IUserIdProvider。

using Microsoft.AspNetCore.SignalR;
using System.Security.Claims;
using Wheel.DependencyInjection;

namespace Wheel.Hubs
{
    public class UserIdProvider : IUserIdProvider, ISingletonDependency
    {
        public string? GetUserId(HubConnectionContext connection)
        {
            return connection.User?.Claims?.FirstOrDefault(a=> a.Type == ClaimTypes.NameIdentifier)?.Value;
        }
    }
}

設定SignalR

在Program中我們需要註冊SignalR以及設定SignalR中介軟體。
新增程式碼:

builder.Services.AddAuthentication(IdentityConstants.BearerScheme)
    .AddBearerToken(IdentityConstants.BearerScheme, options =>
    {
        options.Events = new BearerTokenEvents
        {
            OnMessageReceived = context =>
            {
                var accessToken = context.Request.Query["access_token"];
                // If the request is for our hub...
                var path = context.HttpContext.Request.Path;
                if (!string.IsNullOrEmpty(accessToken) &&
                    (path.StartsWithSegments("/hubs")))
                {
                    // Read the token out of the query string
                    context.Token = accessToken;
                }
                return Task.CompletedTask;
            }
        };
    });

builder.Services.AddSignalR()
    .AddJsonProtocol()
    .AddMessagePackProtocol()
    .AddStackExchangeRedis(builder.Configuration["Cache:Redis"]);

在AddBearerToken設定從Query中讀取access_token,用於SignalR連線是從Url獲取認證的token。
這裡註冊SignalR並支援JSON和二進位制MessagePackProtocol協定。
AddStackExchangeRedis表示用Redis做Redis底板,用於橫向擴充套件。
設定中介軟體

app.MapHub<NotificationHub>("/hubs/notification");

就這樣完成了我們SignalR的整合。

配合EventBus進行推播

有時候我們有些任務可能非實時響應,等待後端處理完成後,再給使用者端發出一個訊息通知。或者其他各種訊息通知的場景,那麼配合EventBus就可以非常靈活了。
接下來我們來模擬一個測試場景
建立NotificationEventData

using MediatR;

namespace Wheel.Handlers
{
    public class NotificationEventData : INotification
    {
        public string Message { get; set; }
    }
}

建立NotificationEventHandler

using Microsoft.AspNetCore.SignalR;
using Wheel.EventBus.Local;
using Wheel.Hubs;
using Wheel.Notifications;

namespace Wheel.Handlers
{
    public class NotificationEventHandler : ILocalEventHandler<NotificationEventData>
    {
        private readonly IHubContext<NotificationHub> _hubContext;

        public NotificationEventHandler(IHubContext<NotificationHub> hubContext)
        {
            _hubContext = hubContext;
        }

        public async Task Handle(NotificationEventData eventData, CancellationToken cancellationToken = default)
        {
            var wellcome = new NotificationData(NotificationType.WellCome)
                .WithData(nameof(eventData.Message), eventData.Message);
            await _hubContext.Clients.All.SendAsync("Notification", wellcome);
        }
    }
}

建立NotificationController

using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Wheel.Handlers;

namespace Wheel.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    [AllowAnonymous]
    public class NotificationController : WheelControllerBase
    {
        [HttpGet]
        public async Task<IActionResult> Test()
        {
            await LocalEventBus.PublishAsync(new NotificationEventData { Message = Guid.NewGuid().ToString() });
            return Ok();
        }
    }
}

啟動專案
先獲取一個token

然後用搞一個SignalR使用者端連線

using Microsoft.AspNetCore.SignalR.Client;
using System.Text.Json;
using System.Text.Json.Serialization;

var connection = new HubConnectionBuilder()
    .WithUrl("https://localhost:7080/hubs/notification?access_token=CfDJ8PRWI6x4TXdPnDiVcuLDwVtyEhzhaNmV9ggxR0_i0_godBkw1wRkg0ct0DezjpwbJb7s6VJxvr3V8mEGE9d9klp_Bhjv2AZE3eQ78KmJygizroSpfFHeoImRaEYIyLNXkHrNEG-MuszVQ6eVFHORm5Kkv-Rux7_1RkVam0tsPYiypRQhcJqUuV3pbeiblOQpJ1WXikmpZ8-jFSqwkNMSBhUx2w50iTWYiEyqpiyrjQqu69NfEregcwxJBOji4dmxiu1Q4tyaFZMyZ3m10tFrSqHuF0cRBXDUf5BHSBGg0b7LImROubDrn5y_ogBmhd3J165gnbjRDnGvmYr6hQjI1ZmfhR_NyriG9zQ7jE5oZDFIUsXgd0Yqod8HTMlTzxY0gSFglPy-vPhzBVD4-WxRSaCtCaReQHVJUZ-SB15cfmvHXdPN9tjsVlMwlK8nWCuPJmnWdgsfEx8QJisPvfzhH_dosPvFQf1nNH3Gz_9NT858SauuXCXj3AKE48Bh4XY6avpO4GFEdlMgYHmCius1BEqlq8KQB9SVuJFLcvhKt0Xbz_TEYiN0LtBC7Ot4FNOvBOy0a9VswuYII_nAMgnRN4dZTz8z8vNS7Yd1zbDY6mL86OuqvhMhEgzEpgkjhdaBvq13fDTtGKmw6bZXLstYH_kDaXGKxzfP38WSoxZ9EI8LyPpoZzhqUeexEGbwhYRWM9zNFH_wvwUGMUvWne4_ZeVqVir8obns496infwK9x4WCfL91YC7_ac7Q7t5HLg9py_NBXmsHXXrs_2kdA5F6DI")
    .Build();

connection.On<NotificationData>("Notification", (data) =>
{
    var newMessage = JsonSerializer.Serialize(data);
    Console.WriteLine($"{DateTime.Now}---{newMessage}");
});
await connection.StartAsync();

Console.ReadKey();
public class NotificationData
{
    public NotificationData(NotificationType type)
    {
        Type = type;
    }

    public NotificationType Type { get; set; }

    public IDictionary<string, object> Data { get; set; } = new Dictionary<string, object>();

    public NotificationData WithData(string name, object value)
    {
        Data.Add(name, value);
        return this;
    }
}
public enum NotificationType
{
    WellCome = 0,
    Info = 1,
    Warn = 2,
    Error = 3
}

啟動程式,由於我們帶了accessToken連線,所以連上立馬就收到Hello的訊息推播。

呼叫API發起推播通知。

可以看到成功接收到了訊息通知。
對接非常容易且靈活。

就這樣我們輕輕鬆鬆完成了訊息實時通知的功能整合。

輪子倉庫地址https://github.com/Wheel-Framework/Wheel
歡迎進群催更。