Python_頭條專案RPC(8)

2020-08-08 20:16:16

RPC簡介

1. 什麼是RPC

遠端過程呼叫(英語:Remote Procedure Call,縮寫爲 RPC,也叫遠端程式呼叫)是一個計算機通訊協定。該協定允許執行於一臺計算機的程式呼叫另一臺計算機的子程式,而程式設計師無需額外地爲這個互動作用程式設計。如果涉及的軟體採用物件導向程式設計,那麼遠端過程呼叫亦可稱作遠端呼叫遠端方法呼叫

2. 背景與用途

在單台計算機中,我們可以通過程式呼叫來傳遞控制和數據;或者說通過程式呼叫,我們可以將多個程式組成一個整體來實現某個功能。

如果將這種呼叫機制 機製推廣到多臺彼此間可以進行網路通訊的計算機,由多臺計算機中的多個程式組成一個整體來實現某個功能,這也是可以的。呼叫的一方(發起遠端過程呼叫,然後呼叫這方的環境掛起,參數通過網路傳遞給被呼叫方,被呼叫的一方執行程式,當程式執行完成後,產生的結果再通過網路回傳給呼叫的一方,呼叫的一方恢復繼續執行。這樣一種原型思想,就是我們所說的RPC遠端過程呼叫。

RPC這種思想最早可以追溯到1976年,RPC的發展到今天已經40年有餘了。

如今的計算機應用中,單機效能上很難承受住產品的壓力,需要不斷擴充多臺機器來提升整體的效能。同時爲了充分利用這些叢集裡的計算機,需要對其從架構上進行劃分,以提供不同的服務,服務間相互呼叫完成整個產品的功能。RPC就能幫助我們解決這些服務間的資訊傳遞和呼叫。

3. 概念說明

關於RPC的概念,我們可以從廣義和狹義來分別進行理解。

廣義

我們可以將所有通過網路來進行通訊呼叫的實現統稱爲RPC。

按照這樣來理解的話,那我們發現HTTP其實也算是一種RPC實現。

狹義

區別於HTTP的實現方式,在傳輸的數據格式上和傳輸的控制上獨立實現。比如在機器間通訊傳輸的數據不採用HTTP協定的方式(分爲起始行、header、body三部份),而是使用自定義格式的二進制方式。

我們更多時候談到的RPC都是指代這種狹義上的理解。

4. 優缺點

相比於傳統HTTP的實現而言:

優點

  • 效率高
  • 發起RPC呼叫的一方,在編寫程式碼時可忽略RPC的具體實現,如同編寫本地函數呼叫一樣

缺點

  • 通用性不如HTTP好 因爲傳輸的數據不是HTTP協定格式,所以呼叫雙方需要專門實現的通訊庫,對於不同的程式設計開發語言,都要有相關實現。而HTTP作爲一個標準協定,大部分的語言都已有相關的實現,通用性更好。

HTTP更多的面向使用者與產品伺服器的通訊。

RPC更多的面向產品內部伺服器間的通訊。 thrift

RPC結構

RPC的設計思想是力圖使遠端呼叫中的通訊細節對於使用者透明,呼叫雙方無需關心網路通訊的具體實現。因而實現RPC要進行一定的封裝。

RPC原理上是按如下結構流程進行實現的。

流程:

  1. 呼叫者(Caller, 也叫用戶端、Client)以本地呼叫的方式發起呼叫;
  2. Client stub(用戶端存根,可理解爲輔助助手)收到呼叫後,負責將被呼叫的方法名、參數等打包編碼成特定格式的能進行網路傳輸的訊息體;
  3. Client stub將訊息體通過網路發送給對端(伺服器端)
  4. Server stub(伺服器端存根,同樣可理解爲輔助助手)收到通過網路接收到訊息後按照相應格式進行拆包解碼,獲取方法名和參數;
  5. Server stub根據方法名和參數進行本地呼叫;
  6. 被呼叫者(Callee,也叫Server)本地呼叫執行後將結果返回給server stub;
  7. Server stub將返回值打包編碼成訊息,並通過網路發送給對端(用戶端);
  8. Client stub收到訊息後,進行拆包解碼,返回給Client;
  9. Client得到本次RPC呼叫的最終結果。

================================

gRPC

簡介

  • gRPC是由Google公司開源的高效能RPC框架。

  • gRPC支援多語言

    gRPC原生使用C、Java、Go進行了三種實現,而C語言實現的版本進行封裝後又支援C++、C#、Node、ObjC、 Python、Ruby、PHP等開發語言

  • gRPC支援多平臺

    支援的平臺包括:Linux、Android、iOS、MacOS、Windows

  • gRPC的訊息協定使用Google自家開源的Protocol Buffers協定機制 機製(proto3) 序列化

  • gRPC的傳輸使用HTTP/2標準,支援雙向流和連線多路複用

架構

C語言實現的gRPC支援多語言,其架構如下

使用方法

  1. 使用Protocol Buffers(proto3)的IDL介面定義語言定義介面服務,編寫在文字檔案(以.proto爲後綴名)中。
  2. 使用protobuf編譯器生成伺服器和用戶端使用的stub程式碼
  3. 編寫補充伺服器和用戶端邏輯程式碼

======================================

Protocol Buffers

Protocol Buffers 是一種與語言無關,平臺無關的可延伸機制 機製,用於序列化結構化數據。使用Protocol Buffers 可以一次定義結構化的數據,然後可以使用特殊生成的原始碼輕鬆地在各種數據流中使用各種語言編寫和讀取結構化數據。

現在有許多框架等在使用Protocol Buffers。gRPC也是基於Protocol Buffers。 Protocol Buffers 目前有2和3兩個版本號。

在gRPC中推薦使用proto3版本。

1 文件結構

1) Protocol Buffers版本

Protocol Buffers文件的第一行非註釋行,爲版本申明,不填寫的話預設爲版本2。

syntax = "proto3";
或者
syntax = "proto2";

2)Package包

Protocol Buffers 可以宣告package,來防止命名衝突。 Packages是可選的。

package foo.bar;
message Open { ... }

使用的時候,也要加上名稱空間,

message Foo {
  ...
  foo.bar.Open open = 1;
  ...
}

注意:對於Python而言,package會被忽略處理,因爲Python中的包是以檔案目錄來定義的。

3)匯入

Protocol Buffers 中可以匯入其它檔案訊息等,與Python的import類似。

import 「myproject/other_protos.proto」;

4)定義各種訊息和服務

訊息messge是用來定義數據的,服務service是用來gRPC的方法的。

2 註釋

Protocol Buffers 提供以下兩種註釋方式。

// 單行註釋
//
//
//
/* 
多行註釋 



*/

3 數據型別

3.1 基本數據型別

.proto 說明 Python
double   float
float   float
int32 使用變長編碼,對負數編碼效率低, 如果你的變數可能是負數,可以使用sint32 int
int64 使用變長編碼,對負數編碼效率低,如果你的變數可能是負數,可以使用sint64 int/long
uint32 使用變長編碼 int/long
uint64 使用變長編碼 int/long
sint32 使用變長編碼,帶符號的int型別,對負數編碼比int32高效 int
sint64 使用變長編碼,帶符號的int型別,對負數編碼比int64高效 int/long
fixed32 4位元組編碼, 如果變數經常大於2^{28} 的話,會比uint32高效 int
fixed64 8位元組編碼, 如果變數經常大於2^{56} 的話,會比uint64高效 int/long
sfixed32 4位元組編碼 int
sfixed64 8位元組編碼 int/long
bool   bool
string 必須包含utf-8編碼或者7-bit ASCII text str
bytes 任意的位元組序列 str

3.2 列舉

在 Proto Buffers 中,我們可以定義列舉和列舉型別,

enum Corpus {
    UNIVERSAL = 0;
    WEB = 1;
    IMAGES = 2;
    LOCAL = 3;
    NEWS = 4;
    PRODUCTS = 5;
    VIDEO = 6;
}
Corpus corpus = 4;

列舉定義在一個訊息內部或訊息外部都是可以的,如果列舉是 定義在 message 內部,而其他 message 又想使用,那麼可以通過 MessageType.EnumType 的方式參照。

定義列舉的時候,我們要保證第一個列舉值必須是0,列舉值不能重複,除非使用 option allow_alias = true 選項來開啓別名。

enum EnumAllowingAlias {
    option allow_alias = true;
    UNKNOWN = 0;
    STARTED = 1;
    RUNNING = 1;
}

列舉值的範圍是32-bit integer,但因爲列舉值使用變長編碼,所以不推薦使用負數作爲列舉值,因爲這會帶來效率問題。

4 訊息型別

Protocol Buffers使用message定義訊息數據。在Protocol Buffers中使用的數據都是通過message訊息數據封裝基本型別數據或其他訊息數據,對應Python中的類。

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}

4.1 欄位編號

訊息定義中的每個欄位都有唯一的編號。這些欄位編號用於以訊息二進制格式標識欄位,並且在使用訊息型別後不應更改。 請注意,1到15範圍內的欄位編號需要一個位元組進行編碼,包括欄位編號和欄位型別16到2047範圍內的欄位編號佔用兩個位元組。因此,您應該爲非常頻繁出現的訊息元素保留數位1到15。請記住爲將來可能新增的常用元素留出一些空間。

最小的標識號可以從1開始,最大到2^29 - 1,或 536,870,911。不可以使用其中的[19000-19999]的標識號, Protobuf協定實現中對這些進行了預留。如果非要在.proto檔案中使用這些預留標識號,編譯時就會報警。同樣你也不能使用早期保留的標識號。

4.2 指定欄位規則

訊息欄位可以是以下之一:

  • singular:格式良好的訊息可以包含該欄位中的零個或一個(但不超過一個)。

  • repeated:此欄位可以在格式良好的訊息中重複任意次數(包括零)。將保留重複值的順序。對應Python的列表。

      message Result {
        string url = 1;
        string title = 2;
        repeated string snippets = 3;
      }
    

4.3 新增更多訊息型別

可以在單個.proto檔案中定義多個訊息型別。

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}

message SearchResponse {
 ...
}

4.4 保留欄位

保留變數不被使用

如果通過完全刪除欄位或將其註釋來更新訊息型別,則未來使用者可以在對型別進行自己的更新時重用欄位編號。如果以後載入相同的舊版本,這可能會導致嚴重問題,包括數據損壞,隱私錯誤等。確保不會發生這種情況的一種方法是指定已刪除欄位的欄位編號(或名稱)reserved。如果將來的任何使用者嘗試使用這些欄位識別符號,protobuf編譯器將會報錯。

message Foo {
  reserved 2, 15, 9 to 11;
  reserved "foo", "bar";
}

4.5 預設值

解析訊息時,如果編碼訊息不包含特定的單數元素,則解析物件中的相應欄位將設定爲該欄位的預設值。這些預設值是特定於型別的:

  • 對於字串,預設值爲空字串。
  • 對於位元組,預設值爲空位元組。
  • 對於bools,預設值爲false。
  • 對於數位型別,預設值爲零。
  • 對於列舉,預設值是第一個定義的列舉值,該值必須爲0。
  • 對於訊息欄位,未設定該欄位。它的確切值取決於語言。
  • 重複欄位的預設值爲空(通常是相應語言的空列表)。

4.6 巢狀型別

你可以在其他訊息型別中定義、使用訊息型別,在下面 下麪的例子中,Result訊息就定義在SearchResponse訊息內,如:

message SearchResponse {
  message Result {
    string url = 1;
    string title = 2;
    repeated string snippets = 3;
  }
  repeated Result results = 1;
}

如果要在其父訊息型別之外重用此訊息型別,使用

SearchResponse.Result

5 map對映

如果要在數據定義中建立關聯對映,Protocol Buffers提供了一種方便的語法:

map< key_type, value_type> map_field = N ;

其中key_type可以是任何整數或字串型別。請注意,列舉不是有效的key_type。value_type可以是除map對映型別外的任何型別。

例如,如果要建立專案對映,其中每條Project訊息都與字串鍵相關聯,則可以像下面 下麪這樣定義它:

map<string, Project> projects = 3 ;
  • map的欄位可以是repeated。
  • 序列化後的順序和map迭代器的順序是不確定的,所以你不要期望以固定順序處理map
  • 當爲.proto檔案產生生成文字格式的時候,map會按照key 的順序排序,數值化的key會按照數值排序。
  • 從序列化中解析或者融合時,如果有重複的key則後一個key不會被使用,當從文字格式中解析map時,如果存在重複的key,則解析可能會失敗。
  • 如果爲對映欄位提供鍵但沒有值,則欄位序列化時的行爲取決於語言。在Python中,使用型別的預設值。

6 oneof

如果你的訊息中有很多可選欄位, 並且同時至多一個欄位會被設定, 你可以加強這個行爲,使用oneof特性節省記憶體。

爲了在.proto定義oneof欄位, 你需要在名字前面加上oneof關鍵字, 比如下面 下麪例子的test_oneof:

message SampleMessage {
  oneof test_oneof {
    string name = 4;
    SubMessage sub_message = 9;
  }
}

然後你可以增加oneof欄位到 oneof 定義中. 你可以增加任意型別的欄位, 但是不能使用repeated 關鍵字。

7 定義服務

Protocol Buffers使用service定義RPC服務。

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

注意:一個service中可定義多個方法。

========================================

推薦系統介面定義

介面原型

介面名稱: user_recommend

呼叫參數:

UserRequest:
    user_id       # 使用者id
    channel_id    # 頻道id
    article_num   # 推薦的文章數量
    time_stamp    # 推薦的時間戳

返回數據:

ArticleResponse:
    expousre         # 曝光埋點數據
    time_stamp       # 推薦的時間戳
    recommends:      # 推薦結果
        article_id   # 文章id
        track:         # 關於文章的埋點數據
            click    # 使用者點選行爲的埋點參數
            collect  # 使用者收藏的埋點參數
            share    # 使用者分享的埋點參數
            read     # 使用者進入文章詳情的埋點參數

使用Protobuf 定義的介面如下

使用protobuf定義的介面檔案通常以proto作爲檔案後綴名

在toutiao-backend/common/rpc目錄下新建reco.proto檔案

syntax = "proto3";

message UserRequest {
    string user_id=1;
    int32 channel_id=2;
    int32 article_num=3;
    int64 time_stamp=4;
}

message Track {
    string click=1;
    string collect=2;
    string share=3;
    string read=4;
}

message Article {
    int64 article_id=1;
    Track track=2;
}

message ArticleResponse {
    string exposure=1;
    int64 time_stamp=2;
    repeated Article recommends=3;
}

service UserRecommend {
    rpc user_recommend(UserRequest) returns(ArticleResponse) {}
}

程式碼生成

安裝protobuf編譯器和grpc庫

pip install grpcio-tools

編譯生成程式碼

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. reco.proto
  • -I表示搜尋proto檔案中被匯入檔案的目錄
  • --python_out表示儲存生成Python檔案的目錄,生成的檔案中包含介面定義中的數據型別
  • --grpc_python_out表示儲存生成Python檔案的目錄,生成的檔案中包含介面定義中的服務型別

在toutiao-backend/common/rpc目錄下執行上述命令,會自動生成如下兩個rpc呼叫輔助程式碼模組:

  • reco_pb2.py 儲存根據介面定義檔案中的數據型別生成的python類
  • reco_pb2_grpc.py 儲存根據介面定義檔案中的服務方法型別生成的python呼叫RPC方法

==========================================

補全伺服器端

爲了方便看到效果,我們編寫補全伺服器端程式碼。

注意:此處實際推薦的程式碼在後續推薦系統課程中會涉及到

在toutiao-backend/common/rpc目錄下新建server.py檔案

import reco_pb2
import reco_pb2_grpc
import grpc
from concurrent.futures import ThreadPoolExecutor
import time


# rpc介面定義中服務對應成Python的類
class UserRecommendService(reco_pb2_grpc.UserRecommendServicer):

    # 在介面定義的同名方法中補全,被呼叫時應該執行的邏輯
    def user_recommend(self, request, context):
        # request是呼叫的請求數據物件
        user_id = request.user_id
        channel_id = request.channel_id
        article_num = request.article_num
        time_stamp = request.time_stamp

        response = reco_pb2.ArticleResponse()
        response.exposure = 'exposure param'
        response.time_stamp = round(time.time()*1000)
        recommends = []
        for i in range(article_num):
            article = reco_pb2.Article()
            article.track.click = 'click param {}'.format(i+1)
            article.track.collect = 'collect param {}'.format(i+1)
            article.track.share = 'share param {}'.format(i+1)
            article.track.read = 'read param {}'.format(i+1)
            article.article_id = i+1
            recommends.append(article)
        response.recommends.extend(recommends)

        # 最終要返回一個呼叫結果
        return response


def serve():
    """
    rpc伺服器端啓動方法
    """
    # 建立一個rpc伺服器
    server = grpc.server(ThreadPoolExecutor(max_workers=10))

    # 向伺服器中新增被呼叫的服務方法
    reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendService(), server)

    # 微伺服器系結ip地址和埠
    server.add_insecure_port('127.0.0.1:8888')

    # 啓動rpc服務
    server.start()

    # start()不會阻塞,此處需要加上回圈睡眠 防止程式退出
    while True:
        time.sleep(10)


if __name__ == '__main__':
    serve()

編寫用戶端

在toutiao-backend/common/rpc目錄下新建client.py

import grpc
import reco_pb2
import reco_pb2_grpc
import time


def feed_articles(stub):
    # 構建rpc呼叫的呼叫參數
    user_request = reco_pb2.UserRequest()
    user_request.user_id = '1'
    user_request.channel_id = 1
    user_request.article_num = 10
    user_request.time_stamp = round(time.time()*1000)

    # 通過stub進行方法呼叫,並接收呼叫返回值
    ret = stub.user_recommend(user_request)
    print('ret={}'.format(ret))

def run():
    """
    rpc用戶端呼叫的方法
    """
    # 使用with語句連線rpc伺服器
    with grpc.insecure_channel('127.0.0.1:8888') as channel:
        # 建立呼叫rpc遠端服務的輔助物件stub
        stub = reco_pb2_grpc.UserRecommendStub(channel)
        # 通過stub進行rpc呼叫
        feed_articles(stub)

if __name__ == '__main__':
    run()

========================

頭條首頁新聞推薦介面編寫

在toutiao-backend/toutiao/resources/news/article.py中編寫

from rpc import reco_pb2, reco_pb2_grpc

class ArticleListResource(Resource):
    """
    獲取推薦文章列表數據
    """
    def _feed_articles(self, channel_id, timestamp, feed_count):
        """
        獲取推薦文章
        :param channel_id: 頻道id
        :param feed_count: 推薦數量
        :param timestamp: 時間戳
        :return: [{article_id, trace_params}, ...], timestamp
        """
        user_request = reco_pb2.UserRequest()
        user_request.user_id = g.user_id or 'annoy'
        user_request.channel_id = channel_id
        user_request.article_num = feed_count
        user_request.time_stamp = round(time.time() * 1000)

        stub = reco_pb2_grpc.UserRecommendStub(current_app.rpc_reco)
        ret = stub.user_recommend(user_request)
        return ret.recommends, ret.time_stamp

    def get(self):
        """
        獲取文章列表
        """
        qs_parser = RequestParser()
        qs_parser.add_argument('channel_id', type=parser.channel_id, required=True, location='args')
        qs_parser.add_argument('timestamp', type=inputs.positive, required=True, location='args')
        args = qs_parser.parse_args()
        channel_id = args.channel_id
        timestamp = args.timestamp
        per_page = constants.DEFAULT_ARTICLE_PER_PAGE_MIN
        try:
            feed_time = time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(time.time()))
        except Exception:
            return {'message': 'timestamp param error'}, 400

        results = []

        # 獲取推薦文章列表
        feeds, pre_timestamp = self._feed_articles(channel_id, timestamp, per_page)

        # 查詢文章
        for feed in feeds:
            article = cache_article.ArticleInfoCache(feed.article_id).get()
            if article:
                article['pubdate'] = feed_time
                article['trace'] = {
                    'click': feed.track.click,
                    'collect': feed.track.collect,
                    'share': feed.track.share,
                    'read': feed.track.read
                }
                results.append(article)

        return {'pre_timestamp': pre_timestamp, 'results': results}