大家好,我是失業在家,正在找工作的博主Jerry,找工作之餘,總結和整理以前的專案經驗,動手寫了個洋蔥架構(整潔架構)範例解決方案 OnionArch。其目的是為了更好的實現基於DDD(領域驅動分析)和命令查詢職責分離(CQRS)的洋蔥架構。
OnionArch 是用來實現單個微服務的。它提供了Grpc介面和Dapr Side Car進行互動,通過Dapr來實現微服務之間的介面呼叫、事件釋出訂閱等微服務特性。但是,Dapr官方檔案上只有Go語言的Grpc的微服務呼叫範例,沒有事件釋出和訂閱範例,更沒有基於Grpc通訊用.Net實現的事件訂閱和釋出範例。
為了方便大家寫程式碼,本文旨在介紹如何通過Dapr實現.Net Grpc服務之間的釋出和訂閱,並採用與WebApi類似的事件訂閱方式。
如果是Dapr Side Car通過Web Api和微服務參照互動,在WebApi中實現事件訂閱非常簡單,只要在Action 上增加「[Topic("pubsub", "TestTopic")]」 Attribute即可,可如果Dapr是通過Grpc和Grpc服務互動就不能這樣寫了。
為了保持WebApi和Grpc事件訂閱程式碼的一致性,本文就是要在Grpc通訊的情況下實現如下寫法來訂閱並處理事件。
[Topic("pubsub", "TestTopic")] public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context) { string message = "TestTopicEvent" + request.EventData.Name; Console.WriteLine(message); return Task.FromResult(new HelloReply { Message = message }); }
Dapr實現.Net Grpc服務之間的釋出和訂閱,根據官方檔案,需要重寫AppCallback.AppCallbackBase Grpc類的ListTopicSubscriptions方法和OnTopicEvent方法,ListTopicSubscriptions是給Dapr呼叫獲取該微服務已訂閱的事件,OnTopicEvent給Dapr呼叫以觸發事件到達處理邏輯。但是這樣就需要在AppCallback.AppCallbackBase實現類中寫死已訂閱的事件和事件處理邏輯。顯然不符合我們的實現目標。
參考Dapr SDK中關於WebApi 訂閱查詢介面「http://localhost:<appPort>/dapr/subscribe」的實現程式碼,可以在AppCallback.AppCallbackBase實現類的ListTopicSubscriptions方法中,採用相同的方式,在Grpc方法中查詢Topic Attribute的方式來搜尋已訂閱的事件。這樣就不用在ListTopicSubscriptions中寫死已訂閱的事件了。
為了避免在OnTopicEvent方法中應編碼事件處理邏輯,就需要在接收到事件觸發後動態呼叫Grpc方法。理論上,只要有proto檔案就可以動態呼叫Grpc方法,而proto檔案本來就在專案中。但是,我沒找到.Net動態呼叫Grpc方法的相關資料,不知道大家有沒有?
我這裡採用了另一種方式,根據我上一篇關於.Net 7.0 RC gRPC JSON 轉碼為 Swagger/OpenAPI檔案。Grpc方法可以增加一個轉碼為Json的WebApi呼叫。這樣就可以在OnTopicEvent方法中接收到事件觸發後,通過HttpClient post到對應的WebApi地址,曲線實現動態呼叫Grpc方法。是不是有點脫褲子放屁的感覺?
我的解決方案如下,GrpcServiceA釋出事件,GrpcServiceB接收事件並處理。
GrpcServiceA釋出事件比較簡單,和WebApi的方式是一樣一樣的。
public async override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context) { //await _daprClient.SaveStateAsync("statestore", "testKey", request.Name); EventData eventData = new EventData() { Id = 6, Name = request.Name, Description = "Looking for a job" }; await _daprClient.PublishEventAsync<EventData>("pubsub", "TestTopic", eventData); return new HelloReply { Message = "Hello" + request.Name }; }
_daprClient怎麼來的?我參考Dapr .Net SDK的程式碼,給IGrpcServerBuilder 增加了擴充套件方法:
public static IGrpcServerBuilder AddDapr(this IGrpcServerBuilder builder, Action<DaprClientBuilder> configureClient = null) { if (builder is null) { throw new ArgumentNullException(nameof(builder)); } // This pattern prevents registering services multiple times in the case AddDapr is called // by non-user-code. if (builder.Services.Any(s => s.ImplementationType == typeof(DaprMvcMarkerService))) { return builder; } builder.Services.AddDaprClient(configureClient); builder.Services.AddSingleton<DaprMvcMarkerService>(); return builder; } private class DaprMvcMarkerService { }
然後就可以這樣把DaprClient依賴注入到服務中。
builder.Services.AddGrpc().AddJsonTranscoding().AddDapr();
根據上述實現方案,GrpcServiceB接收事件並處理有點複雜,參考我Grpc介面轉碼Json的的內容,在要接收事件的Grpc方法上增加轉碼Json WebApi 設定。
rpc TestTopicEvent (TestTopicEventRequest) returns (HelloReply){ option (google.api.http) = { post: "/v1/greeter/testtopicevent", body: "eventData" }; }
增加google.api.http選項,,可以通過post eventData 資料到地址「/v1/greeter/testtopicevent」呼叫該Grpc方法。然後實現該Grpc介面。
[Topic("pubsub", "TestTopic")] public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context) { string message = "TestTopicEvent" + request.EventData.Name; Console.WriteLine(message); return Task.FromResult(new HelloReply { Message = message }); }
我重用了Dapr .Net SDK 的Topic Attribute來標記該Grpc的實現介面,這樣就可以搜尋所有帶Topic Attribute的Grpc方法來獲取已經訂閱的事件。
接下來才是重頭戲,重寫AppCallback.AppCallbackBase Grpc介面類的ListTopicSubscriptions方法和OnTopicEvent方法
public async override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context) { var result = new ListTopicSubscriptionsResponse(); var subcriptions = _endpointDataSource.GetDaprSubscriptions(_loggerFactory); foreach (var subscription in subcriptions) { TopicSubscription subscr = new TopicSubscription() { PubsubName = subscription.PubsubName, Topic = subscription.Topic, Routes = new TopicRoutes() }; subscr.Routes.Default = subscription.Route; result.Subscriptions.Add(subscr); } return result; }
該方法返回所有已訂閱的事件和對應的WebApi Url,將事件對應的WebApi地址放入subscr.Routes.Default中。
其中_endpointDataSource.GetDaprSubscriptions 方法參考了Dapr .Net SDK的實現。
public static List<Subscription> GetDaprSubscriptions(this EndpointDataSource dataSource, ILoggerFactory loggerFactory, SubscribeOptions options = null) { var logger = loggerFactory.CreateLogger("DaprTopicSubscription"); var subscriptions = dataSource.Endpoints .OfType<RouteEndpoint>() .Where(e => e.Metadata.GetOrderedMetadata<ITopicMetadata>().Any(t => t.Name != null)) // only endpoints which have TopicAttribute with not null Name. .SelectMany(e => { var topicMetadata = e.Metadata.GetOrderedMetadata<ITopicMetadata>(); var originalTopicMetadata = e.Metadata.GetOrderedMetadata<IOriginalTopicMetadata>(); var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>(); for (int i = 0; i < topicMetadata.Count(); i++) { subs.Add((topicMetadata[i].PubsubName, topicMetadata[i].Name, (topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic, (topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload, topicMetadata[i].Match, topicMetadata[i].Priority, originalTopicMetadata.Where(m => (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.OwnedMetadatas?.Any(o => o.Equals(m.Id)) == true || string.IsNullOrEmpty(m.Id)) .GroupBy(c => c.Name) .ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()), (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator, e.RoutePattern)); } return subs; }) .Distinct() .GroupBy(e => new { e.PubsubName, e.Name }) .Select(e => e.OrderBy(e => e.Priority)) .Select(e => { var first = e.First(); var rawPayload = e.Any(e => e.EnableRawPayload.GetValueOrDefault()); var metadataSeparator = e.FirstOrDefault(e => !string.IsNullOrEmpty(e.MetadataSeparator)).MetadataSeparator ?? ","; var rules = e.Where(e => !string.IsNullOrEmpty(e.Match)).ToList(); var defaultRoutes = e.Where(e => string.IsNullOrEmpty(e.Match)).Select(e => RoutePatternToString(e.RoutePattern)).ToList(); //var defaultRoute = defaultRoutes.FirstOrDefault(); var defaultRoute = defaultRoutes.LastOrDefault(); //multiple identical names. use comma separation. var metadata = new Metadata(e.SelectMany(c => c.OriginalTopicMetadata).GroupBy(c => c.Key).ToDictionary(c => c.Key, c => string.Join(metadataSeparator, c.SelectMany(c => c.Value).Distinct()))); if (rawPayload || options?.EnableRawPayload is true) { metadata.Add(Metadata.RawPayload, "true"); } if (logger != null) { if (defaultRoutes.Count > 1) { logger.LogError("A default subscription to topic {name} on pubsub {pubsub} already exists.", first.Name, first.PubsubName); } var duplicatePriorities = rules.GroupBy(e => e.Priority) .Where(g => g.Count() > 1) .ToDictionary(x => x.Key, y => y.Count()); foreach (var entry in duplicatePriorities) { logger.LogError("A subscription to topic {name} on pubsub {pubsub} has duplicate priorities for {priority}: found {count} occurrences.", first.Name, first.PubsubName, entry.Key, entry.Value); } } var subscription = new Subscription() { Topic = first.Name, PubsubName = first.PubsubName, Metadata = metadata.Count > 0 ? metadata : null, }; if (first.DeadLetterTopic != null) { subscription.DeadLetterTopic = first.DeadLetterTopic; } // Use the V2 routing rules structure if (rules.Count > 0) { subscription.Routes = new Routes { Rules = rules.Select(e => new Rule { Match = e.Match, Path = RoutePatternToString(e.RoutePattern), }).ToList(), Default = defaultRoute, }; } // Use the V1 structure for backward compatibility. else { subscription.Route = defaultRoute; } return subscription; }) .OrderBy(e => (e.PubsubName, e.Topic)); return subscriptions.ToList(); } private static string RoutePatternToString(RoutePattern routePattern) { return string.Join("/", routePattern.PathSegments .Select(segment => string.Concat(segment.Parts.Cast<RoutePatternLiteralPart>() .Select(part => part.Content)))); }
注意標紅的哪一行是我唯一改動的地方,因為Grpc介面增加了Web Api設定後會返回兩個Route,一個是原始Grpc的,一個是WebApi的,我們需要後面那個。
接著重寫OnTopicEvent方法
public async override Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context) { TopicEventResponse topicResponse = new TopicEventResponse(); string payloadString = request.Data.ToStringUtf8(); Console.WriteLine("OnTopicEvent Data:" + payloadString); HttpContent postContent = new StringContent(payloadString, new MediaTypeWithQualityHeaderValue("application/json")); var response = await _httpClient4TopicEvent.PostAsync("http://" + context.Host + "/" + request.Path, postContent); string responseContent = await response.Content.ReadAsStringAsync(); Console.WriteLine(responseContent); if (response.IsSuccessStatusCode) { Console.WriteLine("OnTopicEvent Invoke Success."); topicResponse.Status = TopicEventResponseStatus.Success; } else { Console.WriteLine("OnTopicEvent Invoke Error."); topicResponse.Status = TopicEventResponseStatus.Drop; } return topicResponse; }
這裡簡單處理了事件觸發的返回引數TopicEventResponse ,未處理重試的情況。request.path是在ListTopicSubscriptions方法中返回給Dapr的事件對應的WebApi呼叫地址。
引數_httpClient4TopicEvent是這樣注入的:
builder.Services.AddHttpClient("HttpClient4TopicEvent", httpClient => { httpClient.DefaultRequestVersion = HttpVersion.Version20; httpClient.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrHigher; httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); } );
因為Grpc是基於Http2.0及以上版本的,所以需要修改HtttpClient預設設定,不然無法訪基於Http2.0的WebApi。
然後將我們的AppCallback.AppCallbackBase實現類DaprAppCallbackService Map到GrpcService即可。
app.MapGrpcService<DaprAppCallbackService>();
分別通過Dapr執行ServiceA和ServiceB微服務,注意指定--app-protocol協定為Grpc,我這裡還使用了.Net 熱過載技術。
dapr run --app-protocol grpc --app-id serviceA --app-port 5002 --dapr-grpc-port 50002 -- dotnet watch run --launch-profile https
dapr run --app-protocol grpc --app-id serviceB --app-port 5003 --dapr-grpc-port 50003 -- dotnet watch run --launch-profile https
在ServiceA中釋出事件
在ServiceB中檢視已訂閱的事件和接收到的事件觸發
▪ 博主有15年以上的軟體技術實施經驗(Technical Leader),專注於微服務(Dapr)和雲原生(K8s)軟體架構設計、專注於 .Net Core\Java開發和Devops構建釋出。
▪ 博主10年以上的軟體交付管理經驗(Project Manager & Product Ower),致力於敏捷(Scrum)專案管理、軟體產品業務需求分析和原型設計。
▪ 博主熟練設定和使用 Microsoft Azure雲。
▪ 博主為人誠懇,積極樂觀,工作認真負責。
我家在廣州,也可以去深圳工作。做架構師、產品經理、專案經理都可以。有工作機會推薦的朋友可以加我微信 15920128707,微信名字叫Jerry。
本文原始碼在這裡:iamxiaozhuang/TestDaprGrpcSubscripber (github.com) 大家可以隨便取用。