雲原生時代崛起的程式語言Go遠端呼叫gRPC實戰

2023-06-15 06:01:35

@

概述

定義

gRPC 官網地址 https://grpc.io/ 原始碼release最新版本v1.55.1

gRPC 官網檔案地址 https://grpc.io/docs/

gRPC 原始碼地址 https://github.com/grpc/grpc

gRPC是一個現代的開源高效能遠端過程呼叫(RPC)框架,可以在任何環境中執行。它可以高效地連線資料中心內和跨資料中心的服務,支援負載平衡、跟蹤、執行狀況檢查和身份驗證;同時也是一個CNCF孵化專案。

簡單說gRPC是基於tcp協定使用http2.0,採用Protocol Buffers定義介面,因而相對於傳統的Restful API來說,速度更快,資料更小,介面要求更嚴謹。

背景

在當前分散式和微服務主宰時代,服務拆分後服務與服務之間的通訊就是程序與程序或伺服器與伺服器之間的呼叫,或許你馬上就說可以採用http,http雖然便捷方便,但效能較低,這時間我們可以採用RPC(Remote Procredure Call)來實現,通過自定義協定發起TCP呼叫來提高傳輸效率;

RPC是一款語言中立、平臺中立、開源的遠端過程呼叫技術,使用者端和伺服器端可以在多種環境中執行和互動,使用者端和伺服器端可以採用不同語言開發。資料在進行網路傳輸的時候需要先進行序列化,而序列化協定有很多種,比如XML、Json、Thrift、Avro、Hessian、Kryo、Protocol Buffers、ProtoStuff。

  • 序列化(serialization)就是將物件序列化為二進位制形式(位元組陣列),一般也將序列化稱為編碼(Encode),主要用於網路傳輸、資料持久化等;
  • 反序列化(deserialization)則是將從網路、磁碟等讀取的位元組陣列還原成原始物件,以便後續業務的進行,一般也將反序列化稱為解碼(Decode),主要用於網路傳輸物件的解碼,以便完成遠端呼叫。

原生rpc(在go標準庫net/rpc包下)編寫相對複雜,需要自己去關注實現過程,沒有程式碼提示。因此更多會使用gRPC。在gRPC中,客戶機應用程式可以直接呼叫不同機器上的伺服器應用程式上的方法,就像它是本地物件一樣,使得更容易建立分散式應用程式和服務。gRPC基於定義服務的思想,指定可以遠端呼叫的方法及其引數和返回型別。在伺服器端,伺服器實現這個介面,並執行gRPC伺服器來處理使用者端呼叫。在使用者端,使用者端有一個提供相同方法的存根(在某些語言中僅稱為使用者端)。gRPC使用者端和伺服器可以在各種環境中執行並相互通訊——從Google內部的伺服器到您自己的桌面——並且可以用任何gRPC支援的語言編寫;如可以輕鬆地用Java建立gRPC伺服器,用Go、Python或Ruby建立使用者端。

特點

  • 簡單服務定義:使用Protocol Buffers(一種強大的二進位制序列化工具集和語言)定義服務,設計一個新的協定,需要準確,高效和語言獨立。
  • 低延遲、高可延伸、分散式系統:快速開始並擴大規模,用一行程式碼安裝執行時和開發環境,還可以使用框架擴充套件到每秒數百萬個rpc。序列化後體積小,序列化和反序列化速度快。
  • 跨語言和平臺工作:以各種語言和平臺為您的服務自動生成慣用的客戶機和伺服器存根.開發與雲伺服器通訊的移動使用者端。
  • 雙向流和整合驗證:雙向流和完全整合的基於HTTP/2傳輸的可插拔身份驗證。
  • 分層設計:支援擴充套件。身份驗證、負載平衡、紀錄檔記錄和監控等

四種服務方法

proto中rpc業務實際上是一個函數,由伺服器端重寫(overwrite)的函數,根據rpc函數的入參和出參簡單分為普通RPC、伺服器端流RPC、使用者端流RPC、雙端流RPC。

  • 一元rpc:其中使用者端向伺服器傳送單個請求並獲得單個響應,就像普通的函數呼叫一樣。
    • 一旦客戶機呼叫了存根方法,伺服器就會收到RPC呼叫的通知,其中包含該呼叫的客戶機後設資料、方法名稱和指定的截止日期(如果適用)。
    • 然後,伺服器可以直接傳送回自己的初始後設資料(必須在任何響應之前傳送),或者等待使用者端的請求訊息。首先發生的是特定於應用程式的。
    • 一旦伺服器獲得了客戶機的請求訊息,它就會執行建立和填充響應所需的任何工作。然後將響應(如果成功)連同狀態詳細資訊(狀態程式碼和可選狀態訊息)以及可選的尾隨後設資料一起返回給客戶機。
    • 如果響應狀態為OK,則客戶機將獲得響應,從而完成客戶機端的呼叫。
rpc SayHello(HelloRequest) returns (HelloResponse);
  • 伺服器流式rpc:其中使用者端向伺服器傳送請求並獲得讀取訊息序列的流。使用者端從返回的流中讀取,直到沒有更多的訊息。gRPC保證了單個RPC呼叫中的訊息排序。伺服器流RPC類似於一元RPC,不同之處在於伺服器在響應使用者端的請求時返回訊息流。傳送完所有訊息後,伺服器的狀態詳細資訊(狀態碼和可選狀態訊息)和可選的尾隨後設資料被傳送到使用者端。這就完成了伺服器端的處理。客戶機在擁有所有伺服器訊息後完成。
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
  • 使用者端流式rpc:其中使用者端寫入訊息序列並將它們傳送到伺服器,同樣使用提供的流。一旦客戶機完成了訊息的寫入,它將等待伺服器讀取訊息並返回其響應。gRPC再次保證了單個RPC呼叫中的訊息排序。使用者端流式RPC類似於一元RPC,不同之處是使用者端向伺服器傳送訊息流而不是單個訊息。伺服器用一條訊息(連同它的狀態詳細資訊和可選的尾隨後設資料)進行響應,通常是在它接收到所有客戶機的訊息之後,但不一定是這樣。
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
  • 雙向流式rpc:其中雙方使用讀寫流傳送訊息序列。這兩個流獨立執行,因此使用者端和伺服器可以按照自己喜歡的順序進行讀寫:例如,伺服器可以等待接收到所有使用者端訊息後再寫入響應,或者它可以交替地讀取訊息然後寫入訊息,或者其他一些讀寫組合,保留每個流中的訊息順序。在雙向流RPC中,呼叫由呼叫方法的客戶機和接收客戶機後設資料、方法名稱和截止日期的伺服器發起。伺服器可以選擇發回其初始後設資料,或者等待使用者端開始流式傳輸訊息。使用者端和伺服器端流處理是特定於應用程式的。由於這兩個流是獨立的,使用者端和伺服器可以以任何順序讀寫訊息。例如,伺服器可以等到接收到使用者端的所有訊息後再寫入訊息,或者伺服器收到請求,然後發回響應,然後使用者端根據響應傳送另一個請求,以此類推。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

實戰

環境設定

  • 安裝Protobuf
# 下載最新版本23.2的protoc,這個是protobuf程式碼生成工具,通過proto檔案生成對應的程式碼,根據自己作業系統下載相應檔案,這裡以windows 64位元系統為例
wget https://github.com/protocolbuffers/protobuf/releases/download/v23.2/protoc-23.2-win64.zip
# 解壓並放在windows本地目錄,並設定在Path路徑下如D:\Program Files\protoc-23.2-win64\bin,在windows下命令列執行protoc --version檢查是否安裝設定正確

  • 使用go get獲取grpc的官方軟體包
# 建立go專案grpc-demo,在GoLand IDE編寫,並通過下面命令安裝grpc核心庫protoc,可以GoLand IDE安裝protoc外掛,實現語法高亮
go get google.golang.org/grpc

  • 安裝go的protoc程式碼生成工具。上面安裝的是protocol編譯器,它可以生成多種開發語言程式碼,這裡使用go語言的工具是protoc-gen-go
# 在實際開發中最好指定具體的版本,這裡是演示使用就直接用latest,在命令列中執行下面兩條命令
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

在GOPATH目錄下的bin目錄就已經有剛剛安裝的兩個檔案

proto檔案

預設情況下gRPC使用Protocol Buffers(儘管它可以與JSON等其他資料格式一起使用),Protocol Buffers是Google公司開發的一種跨語言和平臺的序列化資料結構的方式,是一個靈活的、高效的用於序列化資料的協定。使用協定緩衝區時的第一步是定義要在proto檔案中序列化的資料的結構:這是一個擴充套件名為.proto的普通文字檔案。協定緩衝區資料被結構化為訊息,其中每個訊息都是包含一系列稱為欄位的名稱-值對的資訊的小邏輯記錄。在普通的原型檔案中定義gRPC服務,使用RPC方法引數和返回型別指定為協定緩衝區訊息。一般來說,雖然可以使用proto2(當前預設協定緩衝區版本),但建議將proto3與gRPC一起使用,因為它允許您使用所有gRPC支援的語言,並且避免了proto2使用者端與proto3伺服器通訊的相容性問題。

  • message:用於在protobuf中定義訊息型別,而訊息就是需要進行傳輸的資料格式,類似於go中的struct,在訊息中的資料欄位由欄位型別、欄位名稱、訊息號,一個proto檔案中可以定義多個訊息型別,也即是多服務。
  • 訊息號:在訊息體的定義中,每個欄位都必須有一個唯一的標識號,範圍是[1,2^29-1]
  • 欄位規則
    • required:預設規則,訊息體中必填欄位,不設定會導致編碼解碼的異常
    • optional:訊息體中可選欄位
    • repeated:訊息體中可重複欄位,重複的值的順序會被保留,在go中重複的欄位會定義為切片型別。
  • 欄位對映

  • 預設值

  • 巢狀訊息:可以在其它訊息型別中定義,使用訊息型別,也可以使用外部定義的訊息。
  • import使用:import用於匯入其它的proto檔案。也即是類似有common.proto、user.proto、order.proto,在user.proto、order.proto可以通過import common.proto實現多proto檔案之類參照管理。
  • any任意型別:需要匯入any.proto,屬性使用google.protobuf.Any定義

簡單RPC

簡單RPC也叫一元RPC,其中使用者端向伺服器傳送單個請求並獲得單個響應,就像普通的函數呼叫一樣。在go專案的src目錄下建立simple目錄,在simple建立proto目錄,在建立user.proto

// 語法版本,指定使用proto3
syntax = "proto3";

// 指定生成的go_package,生成的go程式碼使用什麼包package proto
option go_package = ".;proto";

// 服務定義,此處rpc服務的定義,一定要從伺服器端的角度考慮,即接受請求,處理請求並返回響應的一端
service UserService {
  // 遠端呼叫方法定義
  rpc GetUser(UserRequest) returns (UserResponse) {}
}

// 包含使用者編號的請求訊息
message UserRequest {
  // 每個字典=最後序號1為唯一的標識號,必填
  int32 id = 1;
}

// 包含訊息內容的響應訊息
message UserResponse {
  int32 id = 1;
  string name = 2;
}

在命令列中執行如下操作用於生成go的程式碼檔案

# 進入simple目錄
cd src/simple
# 執行protoc命令
protoc --go_out=. --go-grpc_out=. proto/user.proto

如果client和server不在同一工程專案,proto/user.pb.go和proto/user_grpc.pb.go需要都複製到對應client和server相應專案下,這裡先以此例說明。執行後生成proto/user.pb.go和proto/user_grpc.pb.go兩個檔案,其中包含用於填充、序列化和檢索UserRequest和UserResponse訊息型別的程式碼,生成客戶機和伺服器程式碼。

  • 伺服器端流程及程式碼

    • 建立gRPC Server物件,可以簡單理解為Server端抽象物件。
    • 將server(其包含被呼叫的伺服器端介面)註冊到gRPC Server的內部註冊中心,這樣可以在接收到請求時,通過內部的服務發現,發現該伺服器端介面並轉接進行邏輯處理。
    • 建立Listen,監聽TCP埠。
    • gRPC Server開始Listen、Accept,直到Stop。

    將proto檔案下的user.pb.go和user_grpc.pb.go複製一份放在src/simple/server目錄,在src/simple/server建立main.go檔案,內容如下

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"grpc-demo/src/simple/proto"
	"net"
)

type server struct {
	proto.UnimplementedUserServiceServer
}

func (s *server) GetUser(ctx context.Context, req *proto.UserRequest) (*proto.UserResponse, error) {
	// 伺服器端介面實現的業務邏輯
	fmt.Println("client端遠端呼叫成功......, 當前請求傳入id引數為", req.GetId())
	return &proto.UserResponse{
		Id:   req.GetId(),
		Name: "itxiaoshen",
	}, nil
}

func main() {
	//1. 開啟埠
	listen, _ := net.Listen("tcp", ":7070")

	//2. 建立grpc服務
	grpcServer := grpc.NewServer()

	//3. 將編寫好的服務註冊到grpc
	proto.RegisterUserServiceServer(grpcServer, &server{})

	//4. 啟動服務
	err := grpcServer.Serve(listen)
	if err != nil {
		fmt.Printf("failed to server: %v", err)
		return
	}
}
  • 使用者端流程及程式碼
    • 建立與給定目標(伺服器端)的連線互動。
    • 建立對應Client物件。
    • 傳送RPC請求,等待同步響應,得到回撥後返回響應結果。
    • 輸出響應結果。

將proto檔案下的user.pb.go和user_grpc.pb.go複製一份放在src/simple/client目錄,在src/simple/client建立main.go檔案,內容如下

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "grpc-demo/src/simple/server/proto"
	"log"
)

func main() {
	//1. 與Server建立連線,此處禁用安全傳輸,這裡沒有使用加密驗證
	conn, err := grpc.Dial("127.0.0.1:7070", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close() //延時關閉連線

	//2. 與對應服務建立連線
	client := pb.NewUserServiceClient(conn)

	//3. 執行grpc呼叫
	resp, _ := client.GetUser(context.Background(), &pb.UserRequest{Id: 1})
	fmt.Println("client get user,id=", resp.GetId(), ",name=", resp.GetName())
}

啟動server和client,使用者端正確返回結果,伺服器端也列印請求紀錄檔。

Token認證

gRPC提供了一個介面PerRPCCredentials,介面位於credentials包下,介面中有兩個方法,方法需要由使用者端來實現

client的main.go

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "grpc-demo/src/simple/server/proto"
	"log"
)

type ClientTokenAuth struct {
}

func (c ClientTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"appId":  "itxs",
		"appKey": "11223344",
	}, nil
}

func (c ClientTokenAuth) RequireTransportSecurity() bool {
	return false
}

func main() {
	var opts []grpc.DialOption
	opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) //這裡我們不使用TLS,因此這裡傳入空
	opts = append(opts, grpc.WithPerRPCCredentials(new(ClientTokenAuth)))         //傳入我們自定義的驗證方式【Token】
	conn, err := grpc.Dial("127.0.0.1:7070", opts...)

	//1. 與Server建立連線,此處禁用安全傳輸,這裡沒有使用加密驗證
	//conn, err := grpc.Dial("127.0.0.1:7070", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close() //延時關閉連線

	//2. 與對應服務建立連線
	client := pb.NewUserServiceClient(conn)

	//3. 執行grpc呼叫
	resp, err := client.GetUser(context.Background(), &pb.UserRequest{Id: 1})
	if err != nil {
		fmt.Println("client get user error=", err.Error())
	} else {
		fmt.Println("client get user,id=", resp.GetId(), ",name=", resp.GetName())
	}

}

server的main.go

package main

import (
	"context"
	"errors"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	"grpc-demo/src/simple/proto"
	"net"
)

type server struct {
	proto.UnimplementedUserServiceServer
}

func (s *server) GetUser(ctx context.Context, req *proto.UserRequest) (*proto.UserResponse, error) {
	fmt.Println("接收client端遠端呼叫請求")
	//獲取使用者端傳入的後設資料資訊
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, errors.New("未傳輸token")
	}
	var appId string
	var appKey string
	if v, ok := md["appid"]; ok {
		appId = v[0]
	}
	if v, ok := md["appkey"]; ok {
		appKey = v[0]
	}
	if appId != "itxs" || appKey != "11223344" {
		fmt.Println("token 不正確")
		return nil, errors.New("token 不正確")
	}
	fmt.Println("token 驗證正確")
	// 伺服器端介面實現的業務邏輯
	fmt.Println("client端遠端呼叫成功......, 當前請求傳入id引數為", req.GetId())
	return &proto.UserResponse{
		Id:   req.GetId(),
		Name: "itxiaoshen",
	}, nil
}

func main() {
	//1. 開啟埠
	listen, _ := net.Listen("tcp", ":7070")

	//2. 建立grpc服務
	grpcServer := grpc.NewServer()

	//3. 將編寫好的服務註冊到grpc
	proto.RegisterUserServiceServer(grpcServer, &server{})

	//4. 啟動服務
	err := grpcServer.Serve(listen)
	if err != nil {
		fmt.Printf("failed to server: %v", err)
		return
	}
}

啟動server和client,返回正確的結果,反之如果使用者端輸入不正確appId或appKey則會返回token 不正確。

gRPC將各種認證方式濃縮到一個憑證(credentials)上,可以單獨使用一種拼爭,比如只使用TLS或者只使用自定義憑證,也可以多種憑證組合,gRPC提供統一的gRPC驗證機制,使得研發人員使用方便,這也是gRPC設計的巧妙之處。

伺服器流式RPC

伺服器流式RPC這裡使用檔案下載的案例來演示,建立file.proto檔案

// 語法版本,指定使用proto3
syntax = "proto3";

// 指定生成的go_package,生成的go程式碼使用什麼包package proto
option go_package = "./proto;proto";

// 服務定義,此處rpc服務的定義,一定要從伺服器端的角度考慮,即接受請求,處理請求並返回響應的一端
service FileService {
  // 檔案下載遠端呼叫方法定義
  rpc DownLoad(FileRequest) returns (stream FileResponse) {}
}

// 包含檔名的檔案請求訊息
message FileRequest {
  // 每個字典=最後序號1為唯一的標識號,必填
  string name = 1;
}

// 包含訊息內容的響應訊息
message FileResponse {
  string name = 1;
  bytes content = 2;
}

在命令列中執行如下操作用於生成go的程式碼檔案

# 進入simple目錄
cd src/stream
# 執行protoc命令
protoc --go_out=. --go-grpc_out=. proto/file.proto

建立server_stream_server.go實現伺服器端檔案下載

package main

import (
	"fmt"
	"google.golang.org/grpc"
	"grpc-demo/src/stream/proto"
	"io"
	"log"
	"net"
	"os"
)

type FileService struct {
	proto.UnimplementedFileServiceServer
}

func (FileService) DownLoad(req *proto.FileRequest, stream proto.FileService_DownLoadServer) error {
	fmt.Println(req)
	file, err := os.Open("src\\stream\\static\\winutils-master.zip")
	if err != nil {
		panic(err)
	}
	defer file.Close()

	for {
		buf := make([]byte, 1024)
		_, err = file.Read(buf)
		if err == io.EOF {
			break
		}

		if err != nil {
			panic(err)
		}
		stream.Send(&proto.FileResponse{
			Content: buf,
		})
	}
	return nil
}

func main() {
	listen, _ := net.Listen("tcp", ":7070")
	// 建立grpc服務
	grpcServer := grpc.NewServer()
	// 註冊服務
	proto.RegisterFileServiceServer(grpcServer, &FileService{})
	// 啟動服務
	err := grpcServer.Serve(listen)
	if err != nil {
		log.Fatal("服務啟動失敗:", err)
		return
	}
}

建立server_stream_client.go實現使用者端檔案下載

package main

import (
	"bufio"
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc-demo/src/stream/proto"
	"io"
	"log"
	"os"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:7070", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal("連線失敗:", err)
		return
	}
	defer conn.Close()

	// 建立連線
	client := proto.NewFileServiceClient(conn)
	// 執行rpc呼叫
	serverStream, err := client.DownLoad(context.Background(), &proto.FileRequest{Name: "hello.zip"})
	if err != nil {
		log.Fatalln("獲取流出錯", err)
	}

	file, err := os.OpenFile("winutils-master-new.zip", os.O_CREATE|os.O_WRONLY, 0600)
	if err != nil {
		panic(err)
	}
	defer file.Close()

	writer := bufio.NewWriter(file)

	for {
		resp, err := serverStream.Recv()
		if err != nil {
			if err == io.EOF {
				fmt.Println("使用者端資料接收完成")
				err := serverStream.CloseSend()
				if err != nil {
					log.Fatal(err)
				}
				break
			}
		}
		writer.Write(resp.Content)
	}
	writer.Flush()
}

執行伺服器端和使用者端,最終按照預期下載檔案

使用者端流式RPC

使用者端流式RPC這裡使用檔案上傳的案例來演示,修改file.proto檔案,增加UpFileService相關內容

// 語法版本,指定使用proto3
syntax = "proto3";

// 指定生成的go_package,生成的go程式碼使用什麼包package proto
option go_package = "./proto;proto";

// 服務定義,此處rpc服務的定義,一定要從伺服器端的角度考慮,即接受請求,處理請求並返回響應的一端
service FileService {
  // 檔案下載遠端呼叫方法定義
  rpc DownLoad(FileRequest) returns (stream FileResponse) {}
}

// 服務定義,此處rpc服務的定義,一定要從伺服器端的角度考慮,即接受請求,處理請求並返回響應的一端
service UpFileService {
  // 檔案下載遠端呼叫方法定義
  rpc UpLoad(stream UpFileRequest) returns (UpFileResponse) {}
}

// 包含檔名的檔案請求訊息
message FileRequest {
  // 每個字典=最後序號1為唯一的標識號,必填
  string name = 1;
}

// 包含訊息內容的響應訊息
message FileResponse {
  string name = 1;
  bytes content = 2;
}

// 包含檔名的檔案請求訊息
message UpFileRequest {
  string name = 1;
  bytes content = 2;
}

// 包含檔名的檔案請求訊息
message UpFileResponse {
  string state = 1;
}

在命令列中執行如下操作用於生成go的程式碼檔案

# 進入simple目錄
cd src/stream
# 執行protoc命令
protoc --go_out=. --go-grpc_out=. proto/file.proto

建立client_stream_server.go實現伺服器端檔案下載

package main

import (
	"bufio"
	"google.golang.org/grpc"
	"grpc-demo/src/stream/proto"
	"io"
	"log"
	"net"
	"os"
)

type UpFileService struct {
	proto.UnimplementedUpFileServiceServer
}

func (UpFileService) UpLoad(stream proto.UpFileService_UpLoadServer) error {
	file, err := os.OpenFile("src/stream/static/apache-maven-3.8.6-bin-new.zip", os.O_CREATE|os.O_WRONLY, 0600)
	if err != nil {
		panic(err)
	}
	defer file.Close()

	writer := bufio.NewWriter(file)

	for {
		resp, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}
		}
		writer.Write(resp.Content)
	}

	writer.Flush()

	stream.SendAndClose(&proto.UpFileResponse{
		State: "success",
	})

	return nil
}

func main() {
	listen, _ := net.Listen("tcp", ":7070")
	// 建立grpc服務
	grpcServer := grpc.NewServer()
	// 註冊服務
	proto.RegisterUpFileServiceServer(grpcServer, &UpFileService{})
	// 啟動服務
	err := grpcServer.Serve(listen)
	if err != nil {
		log.Fatal("服務啟動失敗:", err)
		return
	}
}

建立client_stream_client.go實現使用者端檔案下載

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc-demo/src/stream/proto"
	"io"
	"log"
	"os"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:7070", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal("連線失敗:", err)
		return
	}
	defer conn.Close()

	// 建立連線
	client := proto.NewUpFileServiceClient(conn)
	// 執行rpc呼叫
	clientStream, err := client.UpLoad(context.Background())
	if err != nil {
		log.Fatalln("獲取流出錯", err)
	}

	file, err := os.Open("g:\\apache-maven-3.8.6-bin.zip")
	if err != nil {
		log.Fatalln(err)
	}
	defer file.Close()

	for {
		buf := make([]byte, 1024)
		_, err = file.Read(buf)
		if err == io.EOF {
			break
		}

		if err != nil {
			panic(err)
		}
		clientStream.Send(&proto.UpFileRequest{
			Name:    "apache-maven-3.8.6-bin.zip",
			Content: buf,
		})
	}

	resp, err := clientStream.CloseAndRecv()
	fmt.Println(resp, err)
}

執行伺服器端和使用者端,最終按照預期上傳檔案

雙向流式RPC

雙向流式RPC這裡類似聊天的場景,雙方可以隨時收發,修改file.proto檔案,增加ChatService相關內容

// 語法版本,指定使用proto3
syntax = "proto3";

// 指定生成的go_package,生成的go程式碼使用什麼包package proto
option go_package = "./proto;proto";

// 服務定義,此處rpc服務的定義,一定要從伺服器端的角度考慮,即接受請求,處理請求並返回響應的一端
service FileService {
  // 檔案下載遠端呼叫方法定義
  rpc DownLoad(FileRequest) returns (stream FileResponse) {}
}

// 服務定義,此處rpc服務的定義,一定要從伺服器端的角度考慮,即接受請求,處理請求並返回響應的一端
service UpFileService {
  // 檔案下載遠端呼叫方法定義
  rpc UpLoad(stream UpFileRequest) returns (UpFileResponse) {}
}

service ChatService {
  // 檔案下載遠端呼叫方法定義,文字聊天
  rpc TextChat(stream TextRequest) returns (stream TextResponse) {}
}

// 包含檔名的檔案請求訊息
message FileRequest {
  // 每個字典=最後序號1為唯一的標識號,必填
  string name = 1;
}

// 包含訊息內容的響應訊息
message FileResponse {
  string name = 1;
  bytes content = 2;
}

// 包含檔名的檔案請求訊息
message UpFileRequest {
  string name = 1;
  bytes content = 2;
}

// 包含檔名的檔案請求訊息
message UpFileResponse {
  string state = 1;
}

// 包含檔名的檔案請求訊息
message TextRequest {
  // 每個字典=最後序號1為唯一的標識號,必填
  string message = 1;
}

// 包含訊息內容的響應訊息
message TextResponse {
  string message = 1;
}

在命令列中執行如下操作用於生成go的程式碼檔案

# 進入simple目錄
cd src/stream
# 執行protoc命令
protoc --go_out=. --go-grpc_out=. proto/file.proto

建立both_stream_server.go實現伺服器端檔案下載

package main

import (
	"fmt"
	"google.golang.org/grpc"
	"grpc-demo/src/stream/proto"
	"log"
	"net"
)

type ChatService struct {
	proto.UnimplementedChatServiceServer
}

func (ChatService) TextChat(stream proto.ChatService_TextChatServer) error {
	for i := 0; i < 10; i++ {
		req, _ := stream.Recv()
		fmt.Println(req)
		stream.Send(&proto.TextResponse{
			Message: fmt.Sprintf("server send world to client!i=%d", i),
		})
	}
	return nil
}

func main() {
	listen, _ := net.Listen("tcp", ":7070")
	// 建立grpc服務
	grpcServer := grpc.NewServer()
	// 註冊服務
	proto.RegisterChatServiceServer(grpcServer, &ChatService{})
	// 啟動服務
	err := grpcServer.Serve(listen)
	if err != nil {
		log.Fatal("服務啟動失敗:", err)
		return
	}
}

建立both_stream_client.go實現使用者端檔案下載

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc-demo/src/stream/proto"
	"log"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:7070", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal("連線失敗:", err)
		return
	}
	defer conn.Close()

	// 建立連線
	client := proto.NewChatServiceClient(conn)
	// 執行rpc呼叫
	stream, err := client.TextChat(context.Background())
	if err != nil {
		log.Fatalln("獲取流出錯", err)
	}

	for i := 0; i < 10; i++ {
		stream.SendMsg(&proto.TextRequest{
			Message: fmt.Sprintf("client send hello to server!i=%d", i),
		})
		resp, err := stream.Recv()
		fmt.Println(resp, err)
	}

}

執行伺服器端和使用者端,最終按照預期實現雙方文字聊天訊息傳送

  • 本人部落格網站IT小神 www.itxiaoshen.com