前言
雲原生應用程式通常需要可延伸的訊息傳遞解決方案,以提供訊息佇列、主題和訂閱等功能。.NET Aspire 元件簡化了連線到各種訊息傳遞提供程式(例如 Azure 服務匯流排)的過程。在本教學中,小編將為大家介紹如何建立一個 ASP.NET Core 應用並將提交的訊息將傳送到服務匯流排主題以供訂閱者使用。
環境準備
要使用 .NET Aspire,需要在本地安裝以下軟體:
設定 Azure 服務匯流排賬戶
az group create -n <your-resource-group-name> -location eastus
az servicebus namespace create -g <your-resource-group-name> --name <your-namespace-name> --location eastus
az servicebus topic create --g <your-resource-group-name> --namespace-name <your-namespace-name> --name notifications
az servicebus topic subscription create --g <your-resource-group-name> --namespace-name <your-namespace-name> --topic-name notifications --name mobile
備註:your-resource-group-name和your-namespace-name替換為自己值即可。
Azure 身份驗證
可以使用無密碼身份驗證或連線字串來完成此快速入門。無密碼連線使用 Azure Active Directory 和基於角色的存取控制 (RBAC) 連線到服務匯流排名稱空間。無需擔心程式碼、組態檔或安全儲存(例如 Azure Key Vault)中存在寫死連線字串。
除此之外,還可以使用連線字串連線到服務匯流排名稱空間,但建議在實際應用程式和生產環境中使用無密碼方法。有關更多資訊,請閱讀身份驗證和授權或存取無密碼概述頁面。
建立專案
新增 Worker Service
接下來,將工作執行緒服務專案新增到解決方案,以檢索和處理髮往 Azure 服務匯流排的訊息。
Visual Studio 將專案新增到您的解決方案中,並使用新的程式碼行更新專案的Program.cs檔案:AspireMessaging.AppHost
builder.AddProject<Projects.AspireMessaging_WorkerService>("aspiremessaging.workerservice");
完整的檔案結構:
將 .NET Aspire 元件新增到 API
將.NET Aspire Azure 服務匯流排元件新增到您的AspireMessaging應用程式:
dotnet add package Aspire.Azure.Messaging.ServiceBus --prerelease
在Razor Pages 專案的Program.csAspireMessaging檔案中,新增對擴充套件方法的呼叫AddAzureServiceBus:
builder.AddAzureServiceBus("serviceBusConnection");
在專案的_appsettings.json檔案中AspireMessaging,新增對應的連線資訊:
{
"ConnectionStrings": {
"serviceBusConnection": "Endpoint=sb://{your_namespace}.servicebus.windows.net/;
SharedAccessKeyName=accesskeyname;SharedAccessKey=accesskey"
}
}
備註:將{your_namespace}替換為自己的服務匯流排空間的名稱
建立 API 端點
提供一個端點來接收資料並將其釋出到服務匯流排主題並向訂閱者廣播。將以下端點新增到AspireMessaging專案中以向主題傳送訊息:
app.MapPost("/notify", static async (ServiceBusClient client, string message) =>
{
var sender = client.CreateSender("notifications");
// Create a batch
using ServiceBusMessageBatch messageBatch =
await sender.CreateMessageBatchAsync();
if (messageBatch.TryAddMessage(
new ServiceBusMessage($"Message {message}")) is false)
{
// If it's too large for the batch.
throw new Exception(
$"The message {message} is too large to fit in the batch.");
}
// Use the producer client to send the batch of
// messages to the Service Bus topic.
await sender.SendMessagesAsync(messageBatch);
Console.WriteLine($"A message has been published to the topic.");
})
將 .NET Aspire 元件新增到 Worker Service
將.NET Aspire Azure 服務匯流排元件新增到AspireMessaging.Worker應用程式:
dotnet add package Aspire.Azure.Messaging.ServiceBus --prerelease
在Razor Pages 專案的Program.csAspireMessaging.Worker檔案中,新增對擴充套件方法的呼叫AddAzureServiceBus:
builder.AddAzureServiceBus("serviceBusConnection");
在專案的_appsettings.json檔案中AspireMessaging.Worker,新增對應的連線資訊:
{
"ConnectionStrings": {
"serviceBusConnection": "Endpoint=sb://{your_namespace}.servicebus.windows.net/;
SharedAccessKeyName=accesskeyname;SharedAccessKey=accesskey"
}
}
備註:將{your_namespace}替換為自己的服務匯流排空間的名稱
處理來自訂閱者的訊息
當新訊息放入佇列時messages,工作服務應檢索、處理和刪除該訊息。更新Worker.cs類以匹配以下程式碼:
public class Worker(
ILogger<Worker> logger,
ServiceBusClient client) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var processor = client.CreateProcessor(
"notifications",
"mobile",
new ServiceBusProcessorOptions());
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
logger.LogInformation(
"Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
logger.LogInformation("\nStopping the receiver...");
await processor.StopProcessingAsync();
logger.LogInformation("Stopped receiving messages");
}
}
async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
logger.LogInformation("Received: {Body} from subscription.", body);
// complete the message. messages is deleted from the subscription.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
Task ErrorHandler(ProcessErrorEventArgs args)
{
logger.LogError(args.Exception, args.Exception.Message);
return Task.CompletedTask;
}
}
最後:在本地執行並測試應用程式
.net8系列的另外兩篇文章:
.NET 8.0 中有哪些新的變化?
快速入門:構建您的第一個 .NET Aspire 應用程式
擴充套件連結: