前面我們的EventBus已經弄好了,那麼接下來通過EventBus來實現我們的訊息推播就是自然而然的事情了。
說到訊息推播,很多人肯定會想到Websocket,既然我們使用Asp.net core,那麼SignalR肯定是我們的首選。
接下來就用SignalR來實現我們的訊息實時推播。
首選我們需要建立一個Hub,用於連線SignalR。
新增NotificationHub類繼承SignalR.Hub
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Localization;
using Wheel.Notifications;
namespace Wheel.Hubs
{
public class NotificationHub : Hub
{
protected IStringLocalizer L;
public NotificationHub(IStringLocalizerFactory localizerFactory)
{
L = localizerFactory.Create(null);
}
public override async Task OnConnectedAsync()
{
if (Context.UserIdentifier != null)
{
var wellcome = new NotificationData(NotificationType.WellCome)
.WithData("name", Context.User!.Identity!.Name!)
.WithData("message", L["Hello"].Value);
await Clients.Caller.SendAsync("Notification", wellcome);
}
}
}
}
這裡重寫OnConnectedAsync,當用戶授權連線之後,立馬推播一個Hello的訊息。
為了方便並且統一結構,我們最好約定一組通知格式,方便使用者端處理訊息。
建立一個NotificationData類:
namespace Wheel.Notifications
{
public class NotificationData
{
public NotificationData(NotificationType type)
{
Type = type;
}
public NotificationType Type { get; set; }
public IDictionary<string, object> Data { get; set; } = new Dictionary<string, object>();
public NotificationData WithData(string name, object value)
{
Data.Add(name, value);
return this;
}
}
public enum NotificationType
{
WellCome = 0,
Info = 1,
Warn = 2,
Error = 3
}
}
NotificationData包含訊息通知型別Type,以及訊息資料Data。
有時候我們可以能需要自定義使用者表示,那麼就需要實現一個自定義的IUserIdProvider。
using Microsoft.AspNetCore.SignalR;
using System.Security.Claims;
using Wheel.DependencyInjection;
namespace Wheel.Hubs
{
public class UserIdProvider : IUserIdProvider, ISingletonDependency
{
public string? GetUserId(HubConnectionContext connection)
{
return connection.User?.Claims?.FirstOrDefault(a=> a.Type == ClaimTypes.NameIdentifier)?.Value;
}
}
}
在Program中我們需要註冊SignalR以及設定SignalR中介軟體。
新增程式碼:
builder.Services.AddAuthentication(IdentityConstants.BearerScheme)
.AddBearerToken(IdentityConstants.BearerScheme, options =>
{
options.Events = new BearerTokenEvents
{
OnMessageReceived = context =>
{
var accessToken = context.Request.Query["access_token"];
// If the request is for our hub...
var path = context.HttpContext.Request.Path;
if (!string.IsNullOrEmpty(accessToken) &&
(path.StartsWithSegments("/hubs")))
{
// Read the token out of the query string
context.Token = accessToken;
}
return Task.CompletedTask;
}
};
});
builder.Services.AddSignalR()
.AddJsonProtocol()
.AddMessagePackProtocol()
.AddStackExchangeRedis(builder.Configuration["Cache:Redis"]);
在AddBearerToken設定從Query中讀取access_token,用於SignalR連線是從Url獲取認證的token。
這裡註冊SignalR並支援JSON和二進位制MessagePackProtocol協定。
AddStackExchangeRedis表示用Redis做Redis底板,用於橫向擴充套件。
設定中介軟體
app.MapHub<NotificationHub>("/hubs/notification");
有時候我們有些任務可能非實時響應,等待後端處理完成後,再給使用者端發出一個訊息通知。或者其他各種訊息通知的場景,那麼配合EventBus就可以非常靈活了。
接下來我們來模擬一個測試場景
建立NotificationEventData
using MediatR;
namespace Wheel.Handlers
{
public class NotificationEventData : INotification
{
public string Message { get; set; }
}
}
建立NotificationEventHandler
using Microsoft.AspNetCore.SignalR;
using Wheel.EventBus.Local;
using Wheel.Hubs;
using Wheel.Notifications;
namespace Wheel.Handlers
{
public class NotificationEventHandler : ILocalEventHandler<NotificationEventData>
{
private readonly IHubContext<NotificationHub> _hubContext;
public NotificationEventHandler(IHubContext<NotificationHub> hubContext)
{
_hubContext = hubContext;
}
public async Task Handle(NotificationEventData eventData, CancellationToken cancellationToken = default)
{
var wellcome = new NotificationData(NotificationType.WellCome)
.WithData(nameof(eventData.Message), eventData.Message);
await _hubContext.Clients.All.SendAsync("Notification", wellcome);
}
}
}
建立NotificationController
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Wheel.Handlers;
namespace Wheel.Controllers
{
[Route("api/[controller]")]
[ApiController]
[AllowAnonymous]
public class NotificationController : WheelControllerBase
{
[HttpGet]
public async Task<IActionResult> Test()
{
await LocalEventBus.PublishAsync(new NotificationEventData { Message = Guid.NewGuid().ToString() });
return Ok();
}
}
}
啟動專案
先獲取一個token
然後用搞一個SignalR使用者端連線
using Microsoft.AspNetCore.SignalR.Client;
using System.Text.Json;
using System.Text.Json.Serialization;
var connection = new HubConnectionBuilder()
.WithUrl("https://localhost:7080/hubs/notification?access_token=CfDJ8PRWI6x4TXdPnDiVcuLDwVtyEhzhaNmV9ggxR0_i0_godBkw1wRkg0ct0DezjpwbJb7s6VJxvr3V8mEGE9d9klp_Bhjv2AZE3eQ78KmJygizroSpfFHeoImRaEYIyLNXkHrNEG-MuszVQ6eVFHORm5Kkv-Rux7_1RkVam0tsPYiypRQhcJqUuV3pbeiblOQpJ1WXikmpZ8-jFSqwkNMSBhUx2w50iTWYiEyqpiyrjQqu69NfEregcwxJBOji4dmxiu1Q4tyaFZMyZ3m10tFrSqHuF0cRBXDUf5BHSBGg0b7LImROubDrn5y_ogBmhd3J165gnbjRDnGvmYr6hQjI1ZmfhR_NyriG9zQ7jE5oZDFIUsXgd0Yqod8HTMlTzxY0gSFglPy-vPhzBVD4-WxRSaCtCaReQHVJUZ-SB15cfmvHXdPN9tjsVlMwlK8nWCuPJmnWdgsfEx8QJisPvfzhH_dosPvFQf1nNH3Gz_9NT858SauuXCXj3AKE48Bh4XY6avpO4GFEdlMgYHmCius1BEqlq8KQB9SVuJFLcvhKt0Xbz_TEYiN0LtBC7Ot4FNOvBOy0a9VswuYII_nAMgnRN4dZTz8z8vNS7Yd1zbDY6mL86OuqvhMhEgzEpgkjhdaBvq13fDTtGKmw6bZXLstYH_kDaXGKxzfP38WSoxZ9EI8LyPpoZzhqUeexEGbwhYRWM9zNFH_wvwUGMUvWne4_ZeVqVir8obns496infwK9x4WCfL91YC7_ac7Q7t5HLg9py_NBXmsHXXrs_2kdA5F6DI")
.Build();
connection.On<NotificationData>("Notification", (data) =>
{
var newMessage = JsonSerializer.Serialize(data);
Console.WriteLine($"{DateTime.Now}---{newMessage}");
});
await connection.StartAsync();
Console.ReadKey();
public class NotificationData
{
public NotificationData(NotificationType type)
{
Type = type;
}
public NotificationType Type { get; set; }
public IDictionary<string, object> Data { get; set; } = new Dictionary<string, object>();
public NotificationData WithData(string name, object value)
{
Data.Add(name, value);
return this;
}
}
public enum NotificationType
{
WellCome = 0,
Info = 1,
Warn = 2,
Error = 3
}
啟動程式,由於我們帶了accessToken連線,所以連上立馬就收到Hello的訊息推播。
呼叫API發起推播通知。
可以看到成功接收到了訊息通知。
對接非常容易且靈活。
就這樣我們輕輕鬆鬆完成了訊息實時通知的功能整合。
輪子倉庫地址https://github.com/Wheel-Framework/Wheel
歡迎進群催更。