.NET Core中使用gRPC

2022-09-08 06:00:26
1.什麼是gRPC
1.基本介紹

gRPC 一開始由 google 開發,是一款語言中立、平臺中立、開源的遠端過程呼叫(RPC)系統,所以叫g(google)RPC。支援主流開發語言(C, C++, Python, PHP, Ruby, NodeJS, C#, Objective-C、Golang

2.proto檔案

用於定義協定介面和資料格式,不同的語言,相同的檔案,可以理解為一項約定,序列化支援 PB(Protocol buffer)和 JSON,PB 是一種語言無關的高效能序列化框架,基於 HTTP/2 + PB, 保障了 RPC 呼叫的高效能。

說這麼多感覺還是很模糊,上面只是介紹了gRPC是什麼,在我看來其實它大致的作用跟WebServicesWCF差不多,在某個維度上可以說都是作為遠端呼叫,只不過所處的時代和本身的特性,以及生態的發展下,導致它成為目前比較火熱的原因之一,具體的內容後面再討論,先用起來,再深入瞭解,接下來我們使用.Net Core 先搭建一個簡單的Demo,來親自上手實踐一下。

其實背景就是最近在做一個專案,需要做一個公司內部的Nuget包,大概的業務就是Nuget包請求微服務資料,開始想直接使用http的方式,基於整體專案結構後面定了使用gRPC,既學即用,剛好也可以在實際專案應用中,查漏補缺。

3.上手實踐

1.使用vs首先建立一個NetCore gRPC專案,得到一個專案結構如下,框架預設包含一個已經預先定義協定檔案服務介面,如果使用其他的方式也很簡單直接參照相關的包,然後新增以下服務就可以了

2.我們自己建立一個自己的介面,定義一個協定檔案mytestdemo.proto,然後定義一些方法,主要包含如下幾類,其他的一些用法可以在網上搜到,或者去看檔案,只是簡單列一下

1.有引數有返回值

2.無引數有返回值 ,無參使用google.protobuf.Empty

3.集合作為返回值,必須使用repeated 標記

如果你真的不熟悉protobuf的定義方式和寫法,這個無傷大雅,可以使用工具生成

syntax = "proto3";
//引入集合包
import "google/protobuf/empty.proto";
//名稱空間
option csharp_namespace = "GrpcDemo";
//包名
package MyTest;
//介面定義
service MyTestDemo {
  rpc MultipleParam(MultipleRequestPara) returns (MultipleRespone);
  rpc NoParam(google.protobuf.Empty) returns (SingeRespone);
  rpc CollectionParam(google.protobuf.Empty) returns (CollectionResponePara);
}

//多引數請求引數
message MultipleRequestPara {
  int32 Id = 1;
  string Name = 2;//引數個數
  bool IsExists =3;
}
message SingeRespone {
  bool Success =1;
  TestEntity a1 = 2;
  message TestEntity{
	int32 Id =1;
  }
}
//多引數返回
message MultipleRespone {
	bool Success =1;
}
//返回集合引數
message CollectionResponePara {
	repeated CollectionChildrenRespone1 param1 =1;
	repeated CollectionChildrenRespone2 param2 =2;
	repeated int32 param3 =3;
}
//集合屬性1
message CollectionChildrenRespone1 {
	int32 Id =1;
}
//集合屬性2
message CollectionChildrenRespone2 {
	string Name =1;	
}

3.右鍵類,選擇新增,選擇連線的服務,新增gRPC,或者直接修改專案檔案,將新建的proto新增到類中

3.1 重新生成,然後建立服務程式碼MyTestService,如下程式碼
3.2 在啟動類中對映gRPC app.MapGrpcService<MyTestService>(); 否則會報service is unimplemented.

/// <summary>
/// 繼承自MyTestDemo.MyTestDemoBase
/// </summary>
public class MyTestService : MyTestDemo.MyTestDemoBase
{
    public override async Task<MultipleRespone> MultipleParam(MultipleRequestPara request, ServerCallContext context)
    {
        return await Task.FromResult(new MultipleRespone
        {
            Success = true,
        });
    }

    public override async Task<SingeRespone> NoParam(Empty request, ServerCallContext context)
    {
       TestEntity t = new TestEntity();
       t.Id = 1;
       return await Task.FromResult(new SingeRespone { Success = true, entity = t  }); ;
    }

    public override async Task<CollectionResponePara> CollectionParam(Empty request, ServerCallContext context)
    {
        CollectionResponePara collectionResponePara = new CollectionResponePara();
        CollectionChildrenRespone1 a = new CollectionChildrenRespone1 { Id = 1 };
        CollectionChildrenRespone2 b = new CollectionChildrenRespone2 { Name = "jeck" };
        collectionResponePara.Param1.Add(a);
        collectionResponePara.Param2.Add(b);
        return  await  Task.FromResult(collectionResponePara);
    }
}

4.建立使用者端,將proto檔案拷貝過去呼叫,新增服務為使用者端模式,然後新增如下程式碼

 using (var channel = GrpcChannel.ForAddress("https://localhost:7245"))
 {
     var client =  new MyTestDemo.MyTestDemoClient(channel);
     //多引數呼叫
     var reply = client.MultipleParam(new MultipleRequestPara { Id = 123, Name = "sa", IsExists = true });  
     //無參呼叫
     var singeRespone = client.NoParam(new Google.Protobuf.WellKnownTypes.Empty());
     //呼叫集合
     var collectionResponePara = client.CollectionParam(new Google.Protobuf.WellKnownTypes.Empty());
 }

2.gRPC流

gRPC中支援4種流,分別是:

1.簡單 RPC(Unary RPC)它的特點是傳入一個請求物件,返回一個請求物件

2.伺服器端流式 RPC (Server streaming RPC)使用者端傳入一個請求物件,伺服器端可以返回多個結果物件,形象的表示就是使用者端傳入一個股票的id,伺服器端就將股票的資訊遠遠不斷地返回

3.使用者端流式 RPC (Client streaming RPC) 使用者端源源不斷的傳入多個請求物件,伺服器端返回一個結果物件,形象的表範例如上位機採集實時將採集資料,源源不斷的傳入伺服器

4.雙向流式 RPC (Bi-directional streaming RPC) 結合伺服器端和使用者端流,傳入多請求返回多個結果,相當於建立長連線,可以進行相互的操作

下面我們就主要介紹幾類主要的流的使用以及步驟

1.伺服器端流、使用者端流、雙向流

伺服器端流主要的特徵就是伺服器端會源源不斷的響應資料到使用者端

1.首先還是建立protobuf檔案,宣告一個伺服器端流的rpc介面ExcuteServerStream 和一個使用者端流介面ExcuteClientStream

syntax = "proto3";
option csharp_namespace = "GrpcDemo";
package streamtest;

service StreamTest {
  //伺服器端流定義
  rpc ExcuteServerStream(StreamForClientRequest) returns (stream StreamForClientRespones);
  //使用者端流定義
  rpc ExcuteServerStream(StreamForClientRequest) returns (stream StreamForClientRespones);
  //雙向流
  rpc ExcuteMutualStream(stream StreamForClientRequest) returns ( stream StreamForClientRespones);
}

//呼叫流的請求物件
message StreamForClientRequest{
    int32 Id=1;
}

//呼叫端流的返回物件
message StreamForClientRespones{
	repeated int32 Number=1;//集合
}

2.重新生成服務參照,然後建立對應的實現介面StreamTestService並重寫生成的服務,然後在啟動程式對映服務介面

//伺服器端流介面
public override async Task ExcuteServerStream(StreamForClientRequest req,IServerStreamWriter<StreamForClientRespones> resStream,ServerCallContext context)
{
    //list集合作為模擬資料來源
    var list = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
    foreach (var item in list)
    {
        Console.WriteLine($"********{item}*******");
        var ele = new StreamForClientRespones();
        ele.Number.Add(item);
        //寫入流中
        await resStream.WriteAsync(ele);
        //模擬源源不斷的資料響應
        await Task.Delay(1000);
    }
}

//使用者端流介面
public override async Task<StreamForClientRespones> ExcuteClientStream( IAsyncStreamReader<StreamForClientRequest> requestStream, ServerCallContext context)
{
    StreamForClientRespones intArrayModel = new StreamForClientRespones();
    //獲取請求流中的資料
    while (await requestStream.MoveNext())
    {
        intArrayModel.Number.Add(requestStream.Current.Id + 1);
        Console.WriteLine($"ExcuteClientStream Number {requestStream.Current.Id} 獲取到並處理.");
        Thread.Sleep(100);
    }
    return intArrayModel;
}

//雙向流
public override async Task ExcuteMutualStream(IAsyncStreamReader<StreamForClientRequest> reqStream,IServerStreamWriter<StreamForClientRespones> resStream,ServerCallContext context)
{
    int i = 0;
    //從流中獲取請求
    while (await reqStream.MoveNext())
    {
        i++;
        var ele = new StreamForClientRespones();
        ele.Number.Add(i);
        //寫入響應流
        await resStream.WriteAsync(ele);
        await Task.Delay(500);
    }
}


3.建立使用者端呼叫,把伺服器端的protobuf檔案拷貝到使用者端,然後生成,啟動呼叫

//呼叫伺服器端流 
using (var channel = GrpcChannel.ForAddress("https://localhost:7245"))
 {
     var client = new StreamTest.StreamTestClient(channel);
     //呼叫伺服器端流
     var reply =  client.ExcuteServerStream(new StreamForClientRequest { Id =1});

     //利用執行緒取消
     //CancellationTokenSource cts = new CancellationTokenSource();
     //指定在2s後進行取消操作
     //cts.CancelAfter(TimeSpan.FromSeconds(2.5)); 
     //var reply = client.ExcuteServerStream(new StreamForClientRequest { Id = 1 }, cancellationToken: cts.Token);

     await foreach (var resp in reply.ResponseStream.ReadAllAsync())
     {
         Console.WriteLine(resp.Number[0]);
     }
 }

 //呼叫使用者端流
 using (var channel = GrpcChannel.ForAddress("https://localhost:7245"))
 {
     var client = new StreamTest.StreamTestClient(channel);
     //呼叫使用者端流介面
     var reply = client.ExcuteClientStream();

     //模擬源源不斷的資料傳送
     for (int i = 0; i < 10; i++)
     {
         await reply.RequestStream.WriteAsync(new StreamForClientRequest() { Id = new Random().Next(0, 20) });
         await Task.Delay(100);
     }
     Console.WriteLine("*************傳送完畢*******************");
     await reply.RequestStream.CompleteAsync();
     //接受結果
     foreach (var item in reply.ResponseAsync.Result.Number)
     {
         Console.WriteLine($"This is {item} Result");
     }
 }

//雙向流
using (var channel = GrpcChannel.ForAddress("https://localhost:7245"))
{
    var client = new StreamTest.StreamTestClient(channel);
    //呼叫雙向流介面
    var reply = client.ExcuteMutualStream();
    //獲取流放入執行緒
    var bathCatRespTask = Task.Run(async () =>
    {
        await foreach (var resp in reply.ResponseStream.ReadAllAsync())
        {
            Console.WriteLine(resp.Number[0]);
        }
    });

    //寫入流
    for (int i = 0; i < 10; i++)
    {
        await reply.RequestStream.WriteAsync(new StreamForClientRequest() { Id = new Random().Next(0, 20) });
        await Task.Delay(100);
    }
    //傳送完畢
    await reply.RequestStream.CompleteAsync();
    //開始接收響應
    await bathCatRespTask;
}

2.NetCore Web專案作為使用者端

1.首先還是先引入proto檔案,然後生成使用者端

2.在web專案中的控制器中,我們就不能直接簡陋的使用 using的方式來連線gRPC伺服器端了,可以利用內建的依賴注入的模式來完成

3.下載Grpc.Net.ClientFactory包,然後在`Program將使用者端新增到依賴注入容器

builder.Services.AddGrpcClient<MyTestDemo.MyTestDemoClient>(option => {
    option.Address = new Uri("https://localhost:7245");
});

4.然後在控制器中直接注入,就可以使用

 public class gRPCTestController : ControllerBase
 {
     private readonly MyTestDemoClient _client;
     public gRPCTestController(MyTestDemoClient client)
     {
         _client = client;
     }

     [HttpGet(Name = "Excute")]
     public async Task<string> Get()
     {
         var a = await _client.NoParamAsync(new Google.Protobuf.WellKnownTypes.Empty());
         var str = a.Success.ToString();
         return str;
     }
 }

5.呼叫出現如下問題 ,使用dotnet dev-certs https --trust

3.gRPC AOP攔截

有時候我們想在gRPC服務執行前後做一些操作,這時候可以使用其Aop攔截,如果你要問攔截器可以做什麼,我不太想解釋,繼續往下看,攔截器方法定義在Interceptor類中,伺服器端和使用者端攔截是一樣的原理,下面列舉一些攔截器:

名稱 特點
BlockingUnaryCall 攔截阻塞呼叫
AsyncUnaryCall 攔截非同步呼叫
AsyncServerStreamingCall 攔截非同步伺服器端流呼叫
AsyncClientStreamingCall 攔截非同步使用者端流呼叫
AsyncDuplexStreamingCall 攔截非同步雙向流呼叫
UnaryServerHandler 用於攔截和傳入普通呼叫的伺服器端處理程式
ClientStreamingSerHandler 用於攔截使用者端流呼叫的伺服器端處理程式
ServerStreamingSerHandler 用於攔截伺服器端流呼叫的伺服器端處理程式
DuplexStreamingSerHandler 用於攔截雙向流呼叫的伺服器端處理程式

1.宣告一個UnaryServerHandlerInterceptor型別的自定義攔截器,用於攔截和傳入普通呼叫的伺服器端處理程式,然後繼承自Grpc.Core.Interceptors.Interceptor類, 重寫已經定義的方法UnaryServerHandler

public class UnaryServerHandlerInterceptor : Interceptor
{
    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
       TRequest request,
       ServerCallContext context,
       UnaryServerMethod<TRequest, TResponse> continuation)
    {
        Console.WriteLine("執行呼叫前");
        var result = await continuation(request, context);
        Console.WriteLine("執行呼叫後");
        // 或向 使用者端附加 一些資訊
        // 也可以 用try catch 做異常紀錄檔
        // 可以從 context中取出 呼叫方ip,做ip限制
        // 可以 監控continuation 的 執行時間
        return result;
    } 
}

2.然後在注入容器時加入選項

builder.Services.AddGrpc(option => { 
    option.EnableDetailedErrors = true;
    //加入伺服器端攔截器選項
    option.Interceptors.Add<UnaryServerHandlerInterceptor>();
});