什麼是RPC?聊聊node中怎麼實現 RPC 通訊

2022-11-03 22:00:31

node.js極速入門課程:進入學習

【相關教學推薦:】

什麼是RPC?

RPC:Remote Procedure Call(遠端過程呼叫)是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的函數/方法,由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語意和傳達呼叫的資料。

伺服器和伺服器之間的通訊

RPC vs HTTP

相同點

  • 都是兩臺計算機之間的網路通訊。ajax是瀏覽器和伺服器之間的通行,RPC是伺服器與伺服器之間的通行
  • 需要雙方約定一個資料格式

不同點

  • 定址伺服器不同

ajax 是使用 DNS作為定址服務獲取域名所對應的ip地址,瀏覽器拿到ip地址之後傳送請求獲取資料。

RPC一般是在內網裡面相互請求,所以它一般不用DNS做定址服務。因為在內網,所以可以使用規定的id或者一個虛擬vip,比如v5:8001,然後到定址伺服器獲取v5所對應的ip地址。

  • 應用層協定不同

ajax使用http協定,它是一個文字協定,我們互動資料的時候檔案格式要麼是html,要麼是json物件,使用json的時候就是key-value的形式。

RPC採用二進位制協定。採用二進位制傳輸,它傳輸的包是這樣子的[0001 0001 0111 0110 0010],裡面都是二進位制,一般採用那幾位表示一個欄位,比如前6位是一個欄位,依次類推。

這樣就不需要http傳輸json物件裡面的key,所以有更小的資料體積。

因為傳輸的是二進位制,更適合於計算機來理解,文字協定更適合人類理解,所以計算機去解讀各個欄位的耗時是比文字協定少很多的。

RPC採用二進位制有更小的資料體積,及更快的解讀速度。

  • TCP通訊方式
  • 單工通訊:只能使用者端給伺服器端發訊息,或者只能伺服器端給使用者端發訊息

  • 半雙工通訊:在某個時間段內只能使用者端給伺服器端發訊息,過了這個時間段伺服器端可以給使用者端發訊息。如果把時間分成很多時間片,在一個時間片內就屬於單工通訊

  • 全雙工通訊:使用者端和伺服器端能相互通訊

選擇這三種通訊方式的哪一種主要考慮的因素是:實現難度和成本。全雙工通訊是要比半雙工通訊的成本要高的,在某些場景下還是可以考慮使用半雙工通訊。

ajax是一種半雙工通訊。http是文字協定,但是它底層是tcp協定,http文字在tcp這一層會經歷從二進位制資料流到文字的轉換過程。

理解RPC只是在更深入地理解前端技術。

buffer編解碼二進位制封包

建立buffer

buffer.from: 從已有的資料建立二進位制

const buffer1 = Buffer.from('geekbang')
const buffer2 = Buffer.from([0, 1, 2, 3, 4])


<Buffer 67 65 65 6b 62 61 6e 67>
<Buffer 00 01 02 03 04>
登入後複製

buffer.alloc: 建立一個空的二進位制

const buffer3 = Buffer.alloc(20)

<Buffer 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00>
登入後複製

往buffer裡面寫東西

  • buffer.write(string, offset): 寫入字串
  • buffer.writeInt8(value, offset): int8表示二進位制8位元(8位元表示一個位元組)所能表示的整數,offset開始寫入之前要跳過的位元組數。
  • buffer.writeInt16BE(value, offset): int16(兩個位元組數),表示16個二進位制位所能表示的整數,即32767。超過這個數程式會報錯。
const buffer = Buffer.from([1, 2, 3, 4]) // <Buffer 01 02 03 04>

// 往第二個位元組裡面寫入12
buffer.writeInt8(12, 1) // <Buffer 01 0c 03 04>
登入後複製

大端BE與小端LE:主要是對於2個以上位元組的資料排列方式不同(writeInt8因為只有一個位元組,所以沒有大端和小端),大端的話就是低位地址放高位,小端就是低位地址放低位。如下:

const buffer = Buffer.from([1, 2, 3, 4])

buffer.writeInt16BE(512, 2) // <Buffer 01 02 02 00>
buffer.writeInt16LE(512, 2) // <Buffer 01 02 00 02>
登入後複製

RPC傳輸的二進位制如何表示傳遞的欄位

PC傳輸的二進位制是如何表示欄位的呢?現在有個二進位制包[00, 00, 00, 00, 00, 00, 00],我們假定前三個位元組表示一個欄位值,後面兩個表示一個欄位的值,最後兩個也表示一個欄位的值。那寫法如下:

writeInt16BE(value, 0)
writeInt16BE(value, 2)
writeInt16BE(value, 4)
登入後複製

發現像這樣寫,不僅要知道寫入的值,還要知道值的資料型別,這樣就很麻煩。不如json格式那麼方便。針對這種情況業界也有解決方案。npm有個庫protocol-buffers,把我們寫的引數轉化為buffer

// test.proto 定義的協定檔案
message Column {
  required float num  = 1;
  required string payload = 2;
}
// index.js
const fs = require('fs')
var protobuf = require('protocol-buffers')
var messages = protobuf(fs.readFileSync('test.proto'))

var buf = messages.Column.encode({
	num: 42,
	payload: 'hello world'
})
console.log(buf)
// <Buffer 0d 00 00 28 42 12 0b 68 65 6c 6c 6f 20 77 6f 72 6c 64>

var obj = messages.Column.decode(buf)
console.log(obj)
// { num: 42, payload: 'hello world' }
登入後複製

net建立RPC通道

半雙工通訊

伺服器端程式碼:

const net = require('net')

const LESSON_DATA = {
  136797: '01 | 課程介紹',
  136798: '02 | 內容綜述',
  136799: '03 | Node.js是什麼?',
  136800: '04 | Node.js可以用來做什麼?',
  136801: '05 | 課程實戰專案介紹',
  136803: '06 | 什麼是技術預研?',
  136804: '07 | Node.js開發環境安裝',
  136806: '08 | 第一個Node.js程式:石頭剪刀布遊戲',
  136807: '09 | 模組:CommonJS規範',
  136808: '10 | 模組:使用模組規範改造石頭剪刀布遊戲',
  136809: '11 | 模組:npm',
  141994: '12 | 模組:Node.js內建模組',
  143517: '13 | 非同步:非阻塞I/O',
  143557: '14 | 非同步:非同步程式設計之callback',
  143564: '15 | 非同步:事件迴圈',
  143644: '16 | 非同步:非同步程式設計之Promise',
  146470: '17 | 非同步:非同步程式設計之async/await',
  146569: '18 | HTTP:什麼是HTTP伺服器?',
  146582: '19 | HTTP:簡單實現一個HTTP伺服器'
}

const server = net.createServer(socket => {
  // 監聽使用者端傳送的訊息
  socket.on('data', buffer => {
    const lessonId = buffer.readInt32BE()
    setTimeout(() => {
      // 往使用者端傳送訊息
      socket.write(LESSON_DATA[lessonId])
    }, 1000)
  })
})

server.listen(4000)
登入後複製

使用者端程式碼:

const net = require('net')

const socket = new net.Socket({})

const LESSON_IDS = [
  '136797',
  '136798',
  '136799',
  '136800',
  '136801',
  '136803',
  '136804',
  '136806',
  '136807',
  '136808',
  '136809',
  '141994',
  '143517',
  '143557',
  '143564',
  '143644',
  '146470',
  '146569',
  '146582'
]

socket.connect({
  host: '127.0.0.1',
  port: 4000
})

let buffer = Buffer.alloc(4)
buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

// 往伺服器端傳送訊息
socket.write(buffer)

// 監聽從伺服器端傳回的訊息
socket.on('data', buffer => {
  console.log(buffer.toString())

  // 獲取到資料之後再次傳送訊息
  buffer = Buffer.alloc(4)
  buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

  socket.write(buffer)
})
登入後複製

以上半雙工通訊步驟如下:

  • 使用者端傳送訊息 socket.write(buffer)
  • 伺服器端接受訊息後往使用者端傳送訊息 socket.write(buffer)
  • 使用者端接受訊息後再次傳送訊息

這樣在一個時間端之內,只有一個端往另一個端傳送訊息,這樣就實現了半雙工通訊。那如何實現全雙工通訊呢,也就是在使用者端往伺服器端傳送訊息的同時,伺服器端還沒有訊息返回給使用者端之前,使用者端又傳送了一個訊息給伺服器端。

全雙工通訊

先來看一個場景:

image.png

使用者端傳送了一個id1的請求,但是伺服器端還來不及返回,接著使用者端又傳送了一個id2的請求。

等了一個之後,伺服器端先把id2的結果返回了,然後再把id1的結果返回。

那如何結果匹配到對應的請求上呢?

如果按照時間順序,那麼id1的請求對應了id2的結果,因為id2是先返回的;id2的請求對應了id1的結果,這樣就導致請求包和返回包錯位的情況。

怎麼辦呢?

我們可以給請求包和返回包都帶上序號,這樣就能對應上。

錯位處理

使用者端程式碼:

socket.on('data', buffer => {
  // 包序號
  const seqBuffer = buffer.slice(0, 2)
  // 伺服器端返回的內容
  const titleBuffer = buffer.slice(2)
    
  console.log(seqBuffer.readInt16BE(), titleBuffer.toString())
})

// 包序號
let seq = 0
function encode(index) {
  // 請求包的長度現在是6 = 2(包序號) + 4(課程id)
  buffer = Buffer.alloc(6)
  buffer.writeInt16BE(seq)
  buffer.writeInt32BE(LESSON_IDS[index], 2)

  seq++
  return buffer
}

// 每50ms傳送一次請求
setInterval(() => {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}, 50)
登入後複製

伺服器端程式碼:

const server = net.createServer(socket => {
  socket.on('data', buffer => {
    // 把包序號取出
    const seqBuffer = buffer.slice(0, 2)
    // 從第2個位元組開始讀取
    const lessonId = buffer.readInt32BE(2)
    setTimeout(() => {
      const buffer = Buffer.concat([
        seqBuffer,
        Buffer.from(LESSON_DATA[lessonId])
      ])
      socket.write(buffer)
      // 這裡返回時間採用隨機的,這樣就不會按順序返回,就可以測試錯位的情況
    }, 10 + Math.random() * 1000)
  })
})
登入後複製
  • 使用者端把包序號和對應的id給伺服器端
  • 伺服器端取出包序號和對應的id,然後把包序號和id對應的內容返回給使用者端,同時設定返回的時間是隨機的,這樣就不會按照順序返回。

粘包處理

如果我們這樣傳送請求:

for (let i = 0; i < 100; i++) {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}
登入後複製

我們發現伺服器端接收到的資訊如下:

<Buffer 00 00 00 02 16 64 00 01 00 02 16 68 00 02 00 02 31 1c 00 03 00 02 3c 96 00 04 00 02 16 68 00 05 00 02 16 5e 00 06 00 02 16 66 00 07 00 02 16 67 00 08 ... 550 more bytes>
登入後複製

這是因為TCP自己做的一個優化,它會把所有的請求包拼接在一起,這樣就會產生粘包的現象。

伺服器端需要把包進行拆分,拆分成100個小包。

那如何拆分呢?

首先使用者端傳送的封包包括兩部分:定長的包頭和不定長的包體

包頭又分為兩部分:包序號及包體的長度。只有知道包體的長度,才能知道從哪裡進行分割。

let seq = 0
function encode(data) {
    // 正常情況下,這裡應該是使用 protocol-buffers 來encode一段代表業務資料的封包
    // 為了不要混淆重點,這個例子比較簡單,就直接把課程id轉buffer傳送
    const body = Buffer.alloc(4);
    body.writeInt32BE(LESSON_IDS[data.id]);

    // 一般來說,一個rpc呼叫的封包會分為定長的包頭和不定長的包體兩部分
    // 包頭的作用就是用來記載包的序號和包的長度,以實現全雙工通訊
    const header = Buffer.alloc(6); // 包序號佔2個位元組,包體長度佔4個位元組,共6個位元組
    header.writeInt16BE(seq)
    header.writeInt32BE(body.length, 2);

    // 包頭和包體拼起來傳送
    const buffer = Buffer.concat([header, body])

    console.log(`包${seq}傳輸的課程id為${LESSON_IDS[data.id]}`);
    seq++;
    return buffer;
}

// 並行
for (let i = 0; i < 100; i++) {
    id = Math.floor(Math.random() * LESSON_IDS.length)
    socket.write(encode({ id }))
}
登入後複製

伺服器端進行拆包

const server = net.createServer(socket => {
  let oldBuffer = null
  socket.on('data', buffer => {
    // 把上一次data事件使用殘餘的buffer接上來
    if (oldBuffer) {
      buffer = Buffer.concat([oldBuffer, buffer])
    }
    let packageLength = 0
    // 只要還存在可以解成完整包的包長
    while ((packageLength = checkComplete(buffer))) {
      // 確定包的長度後進行slice分割
      const package = buffer.slice(0, packageLength)
      // 剩餘的包利用迴圈繼續分割
      buffer = buffer.slice(packageLength)

      // 把這個包解成資料和seq
      const result = decode(package)

      // 計算得到要返回的結果,並write返回
      socket.write(encode(LESSON_DATA[result.data], result.seq))
    }

    // 把殘餘的buffer記下來
    oldBuffer = buffer
  })
})
登入後複製

checkComplete 函數的作用來確定一個封包的長度,然後進行分割:

function checkComplete(buffer) {
  // 如果包的長度小於6個位元組說明只有包頭,沒有包體,那麼直接返回0
  if (buffer.length <= 6) {
    return 0
  }
  // 讀取包頭的第二個位元組,取出包體的長度
  const bodyLength = buffer.readInt32BE(2)
  // 請求包包括包頭(6個位元組)和包體body
  return 6 + bodyLength
}
登入後複製

decode對包進行解密:

function decode(buffer) {
  // 讀取包頭
  const header = buffer.slice(0, 6)
  const seq = header.readInt16BE()
    
  // 讀取包體  
  // 正常情況下,這裡應該是使用 protobuf 來decode一段代表業務資料的封包
  // 為了不要混淆重點,這個例子比較簡單,就直接讀一個Int32即可
  const body = buffer.slice(6).readInt32BE()

  // 這裡把seq和資料返回出去
  return {
    seq,
    data: body
  }
}
登入後複製

encode把使用者端想要的資料轉化為二進位制返回,這個包同樣包括包頭和包體,包頭又包括包需要包序號和包體的長度。

function encode(data, seq) {
  // 正常情況下,這裡應該是使用 protobuf 來encode一段代表業務資料的封包
  // 為了不要混淆重點,這個例子比較簡單,就直接把課程標題轉buffer返回
  const body = Buffer.from(data)

  // 一般來說,一個rpc呼叫的封包會分為定長的包頭和不定長的包體兩部分
  // 包頭的作用就是用來記載包的序號和包的長度,以實現全雙工通訊
  const header = Buffer.alloc(6)
  header.writeInt16BE(seq)
  header.writeInt32BE(body.length, 2)

  const buffer = Buffer.concat([header, body])

  return buffer
}
登入後複製

當用戶端收到伺服器端傳送的包之後,同樣也要進行拆包,因為所有的包同樣都粘在一起了:

 <Buffer 00 00 00 00 00 1d 30 36 20 7c 20 e4 bb 80 e4 b9 88 e6 98 af e6 8a 80 e6 9c af e9 a2 84 e7 a0 94 ef bc 9f 00 01 00 00 00 1d 30 36 20 7c 20 e4 bb 80 e4 ... 539 more bytes>
登入後複製

因此,使用者端也需要拆包,拆包策略與伺服器端的拆包策略是一致的:

let oldBuffer = null
socket.on('data', buffer => {
  // 把上一次data事件使用殘餘的buffer接上來
  if (oldBuffer) {
    buffer = Buffer.concat([oldBuffer, buffer])
  }
  let completeLength = 0

  // 只要還存在可以解成完整包的包長
  while ((completeLength = checkComplete(buffer))) {
    const package = buffer.slice(0, completeLength)
    buffer = buffer.slice(completeLength)

    // 把這個包解成資料和seq
    const result = decode(package)
    console.log(`包${result.seq},返回值是${result.data}`)
  }

  // 把殘餘的buffer記下來
  oldBuffer = buffer
})
登入後複製

到這裡就實現了雙全工通行,這樣使用者端和伺服器端隨時都可以往對方發小訊息了。

更多node相關知識,請存取:!

以上就是什麼是RPC?聊聊node中怎麼實現 RPC 通訊的詳細內容,更多請關注TW511.COM其它相關文章!