在上一篇博文(https://www.cnblogs.com/lulight/p/16525902.html)中,我們成功的以VM作為Kafka伺服器執行,並且驗證了從其他機器中遠端存取。在本文中,將使用Visual Studio 2022建立Azure Function 作為生產者和消費者在本地進行驗證
1:開啟VS 2022,開始建立Azure Funciton工程
2:選擇 Azure Function模板,並使用.NET 6.0作為執行時,然後選擇 Kafka Trigger。其他值保持預設即可。儲存。
3: 把BorkerList新增到本地組態檔中( local.settings.json ),然後修改正確的topic名稱。因為Kafka伺服器沒有啟用SSL和Password,所以這裡 Protocol 和 AuthenticationMode 都需要修改為 NotSet。
local.setting.json 組態檔:
{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "BrokerList": "xxx.xxx.xxx.xxx:9092", "KafkaPassword": "", "ConnectionString": "" } }
KafkaTrigger Function程式碼:
using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Kafka; using Microsoft.Extensions.Logging; namespace FunctionApp2 { public class Function1 { // KafkaTrigger sample // Consume the message from "topic" on the LocalBroker. // Add `BrokerList` and `KafkaPassword` to the local.settings.json // For EventHubs // "BrokerList": "{EVENT_HUBS_NAMESPACE}.servicebus.windows.net:9093" // "KafkaPassword":"{EVENT_HUBS_CONNECTION_STRING} [FunctionName("Function1")] public void Run( [KafkaTrigger("BrokerList", "test_topic", Username = "$ConnectionString", Password = "%KafkaPassword%", Protocol = BrokerProtocol.NotSet, AuthenticationMode = BrokerAuthenticationMode.NotSet, ConsumerGroup = "$Default")] KafkaEventData<string>[] events, ILogger log) { foreach (KafkaEventData<string> eventData in events) { log.LogInformation($"C# Kafka trigger function processed a message: {eventData.Value}"); } } } }
4:同樣,繼續新增一個 Kafka output 的Function, (與第二步相同)。其他值保持預設即可。儲存。
5:與第三步相同,修改正確的topic名稱。因為Kafka伺服器沒有啟用SSL和Password,所以這裡 Protocol 和 AuthenticationMode 都需要修改為 NotSet。
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.Azure.WebJobs.Extensions.Kafka; using Microsoft.Extensions.Logging; namespace FunctionApp2 { public class Function2 { // KafkaOutputBinding sample // This KafkaOutput binding will create a my_topic "my_topic" on the LocalBroker if it doesn't exists. // Call this function then the KafkaTrigger will be trigged. [FunctionName("Function2")] public IActionResult Output( [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req, [Kafka("BrokerList", "test_topic", Username = "$ConnectionString", Password = "%KafkaPassword%", Protocol = BrokerProtocol.NotSet, AuthenticationMode = BrokerAuthenticationMode.NotSet )] out string eventData, ILogger log) { log.LogInformation("C# HTTP trigger function processed a request."); string message = req.Query["message"]; string responseMessage = string.IsNullOrEmpty(message) ? "This HTTP triggered function executed successfully. Pass a message in the query string" : $"Message {message} sent to the broker. This HTTP triggered function executed successfully."; eventData = $"Received message: {message}"; return new OkObjectResult(responseMessage); } } }
6:F5執行Function Project,使用HTTP Trigger的URL傳送訊息,然後用Kafka Trigger的函數接受訊息。
整個步驟的範例動畫:
適用於 Azure Functions 的 Apache Kafka 繫結概述:https://docs.azure.cn/zh-cn/azure-functions/functions-bindings-kafka?tabs=in-process%2Cportal&pivots=programming-language-csharp
【END】
當在複雜的環境中面臨問題,格物之道需:濁而靜之徐清,安以動之徐生。 雲中,恰是如此!