Go語言RPC(模擬遠端過程呼叫)

2020-07-16 10:05:22
伺服器開發中會使用RPC(Remote Procedure Call,遠端過程呼叫)簡化進程間通訊的過程。RPC 能有效地封裝通訊過程,讓遠端的資料收發通訊過程看起來就像原生的函數呼叫一樣。

本例中,使用通道代替 Socket 實現 RPC 的過程。用戶端與伺服器執行在同一個進程,伺服器和用戶端在兩個 goroutine 中執行。

我們先給出完整程式碼,然後再詳細分析每一個部分。
package main

import (
    "errors"
    "fmt"
    "time"
)

// 模擬RPC用戶端的請求和接收訊息封裝
func RPCClient(ch chan string, req string) (string, error) {

    // 向伺服器傳送請求
    ch <- req

    // 等待伺服器返回
    select {
    case ack := <-ch: // 接收到伺服器返回資料
        return ack, nil
    case <-time.After(time.Second): // 超時
        return "", errors.New("Time out")
    }
}

// 模擬RPC伺服器端接收用戶端請求和回應
func RPCServer(ch chan string) {
    for {
        // 接收用戶端請求
        data := <-ch

        // 列印接收到的資料
        fmt.Println("server received:", data)

        // 反饋給用戶端收到
        ch <- "roger"
    }
}

func main() {

    // 建立一個無緩衝字串通道
    ch := make(chan string)

    // 並行執行伺服器邏輯
    go RPCServer(ch)

    // 用戶端請求資料和接收資料
    recv, err := RPCClient(ch, "hi")
    if err != nil {
        // 發生錯誤列印
        fmt.Println(err)
    } else {
        // 正常接收到資料
        fmt.Println("client received", recv)
    }

}

用戶端請求和接收封裝

下面的程式碼封裝了向伺服器請求資料,等待伺服器返回資料,如果請求方超時,該函數還會處理超時邏輯。

模擬 RPC 的程式碼:
// 模擬RPC用戶端的請求和接收訊息封裝
func RPCClient(ch chan string, req string) (string, error) {

    // 向伺服器傳送請求
    ch <- req

    // 等待伺服器返回
    select {
    case ack := <-ch:  // 接收到伺服器返回資料
        return ack, nil
    case <-time.After(time.Second):  // 超時
        return "", errors.New("Time out")
    }
}
程式碼說明如下:
  • 第 5 行,模擬 socket 向伺服器傳送一個字串資訊。伺服器接收後,結束阻塞執行下一行。
  • 第 8 行,使用 select 開始做多路複用。注意,select 雖然在寫法上和 switch 一樣,都可以擁有 case 和 default。但是 select 關鍵字後面不接任何語句,而是將要復用的多個通道語句寫在每一個 case 上,如第 9 行和第 11 行所示。
  • 第 11 行,使用了 time 包提供的函數 After(),從字面意思看就是多少時間之後,其引數是 time 包的一個常數,time.Second 表示 1 秒。time.After 返回一個通道,這個通道在指定時間後,通過通道返回當前時間。
  • 第 12 行,在超時時,返回超時錯誤。

RPCClient() 函數中,執行到 select 語句時,第 9 行和第 11 行的通道操作會同時開啟。如果第 9 行的通道先返回,則執行第 10 行邏輯,表示正常接收到伺服器資料;如果第 11 行的通道先返回,則執行第 12 行的邏輯,表示請求超時,返回錯誤。

伺服器接收和反餽資料

伺服器接收到用戶端的任意資料後,先列印再通過通道返回給用戶端一個固定字串,表示伺服器已經收到請求。
// 模擬RPC伺服器端接收用戶端請求和回應
func RPCServer(ch chan string) {
    for {
        // 接收用戶端請求
        data := <-ch

        // 列印接收到的資料
        fmt.Println("server received:", data)

        //向用戶端反饋已收到
        ch <- "roger"
    }
}
程式碼說明如下:
  • 第 3 行,構造出一個無限迴圈。伺服器處理完用戶端請求後,通過無限迴圈繼續處理下一個用戶端請求。
  • 第 5 行,通過字串通道接收一個用戶端的請求。
  • 第 8 行,將接收到的資料列印出來。
  • 第 11 行,給用戶端反饋一個字串。

執行整個程式,用戶端可以正確收到伺服器返回的資料,用戶端 RPCClient() 函數的程式碼按下面程式碼中加粗部分的分支執行。
// 等待伺服器返回
select {
case ack := <-ch:  // 接收到伺服器返回資料
    return ack, nil
case <-time.After(time.Second):  // 超時
    return "", errors.New("Time out")
}
程式輸出如下:

server received: hi
client received roger

模擬超時

上面的例子雖然有用戶端超時處理,但是永遠不會觸發,因為伺服器的處理速度很快,也沒有真正的網路延時或者“伺服器宕機”的情況。因此,為了展示 select 中超時的處理,在伺服器邏輯中增加一條語句,故意讓伺服器延時處理一段時間,造成用戶端請求超時,程式碼如下:
// 模擬RPC伺服器端接收用戶端請求和回應
func RPCServer(ch chan string) {
    for {
        // 接收用戶端請求
        data := <-ch

        // 列印接收到的資料
        fmt.Println("server received:", data)

        // 通過睡眠函數讓程式執行阻塞2秒的任務
        time.Sleep(time.Second * 2)

        // 反饋給用戶端收到
        ch <- "roger"
    }
}
第 11 行中,time.Sleep() 函數會讓 goroutine 執行暫停 2 秒。使用這種方法模擬伺服器延時,造成用戶端超時。用戶端處理超時 1 秒時通道就會返回:
// 等待伺服器返回
select {
case ack := <-ch:  // 接收到伺服器返回資料
    return ack, nil
case <-time.After(time.Second):  // 超時
    return "", errors.New("Time out")
}
上面程式碼中,加黑部分的程式碼就會被執行。

主流程

主流程中會建立一個無緩衝的字串格式通道。將通道傳給伺服器的 RPCServer() 函數,這個函數並行執行。使用 RPCClient() 函數通過 ch 對伺服器發出 RPC 請求,同時接收伺服器反饋資料或者等待超時。參考下面程式碼:
func main() {

    // 建立一個無緩衝字串通道
    ch := make(chan string)

    // 並行執行伺服器邏輯
    go RPCServer(ch)

    // 用戶端請求資料和接收資料
    recv, err := RPCClient(ch, "hi")
    if err != nil {
            // 發生錯誤列印
        fmt.Println(err)
    } else {
            // 正常接收到資料
        fmt.Println("client received", recv)
    }

}
程式碼說明如下:
  • 第 4 行,建立無緩衝的字串通道,這個通道用於模擬網路和 socke t概念,既可以從通道接收資料,也可以傳送。
  • 第 7 行,並行執行伺服器邏輯。伺服器一般都是獨立進程的,這裡使用並行將伺服器和用戶端邏輯同時在一個進程內執行。
  • 第 10 行,使用 RPCClient() 函數,傳送“hi”給伺服器,同步等待伺服器返回。
  • 第 13 行,如果通訊過程發生錯誤,列印錯誤。
  • 第 16 行,正常接收時,列印收到的資料。