手把手帶你用Node手寫WebSocket協定

2023-02-17 06:00:58

我們知道,http 是一問一答的模式,使用者端向伺服器傳送 http 請求,伺服器返回 http 響應。

這種模式對資源、資料的載入足夠用,但是需要資料推播的場景就不合適了。

有同學說,http2 不是有 server push 麼?

那只是推資源用的:

比如瀏覽器請求了 html,伺服器端可以連帶把 css 一起推播給瀏覽器。瀏覽器可以決定接不接收。【相關教學推薦:、】

對於即時通訊等實時性要求高的場景,就需要用 websocket 了。

websocket 嚴格來說和 http 沒什麼關係,是另外一種協定格式。但是需要一次從 http 到 websocekt 的切換過程。

切換過程詳細來說是這樣的:

請求的時候帶上這幾個 header:

Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Key: Ia3dQjfWrAug/6qm7mTZOg==
登入後複製

前兩個很容易理解,就是升級到 websocket 協定的意思。

第三個 header 是保證安全用的一個 key。

伺服器端返回這樣的 header:

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: JkE58n3uIigYDMvC+KsBbGZsp1A=
登入後複製

和請求 header 類似,Sec-WebSocket-Accept 是對請求帶過來的 Sec-WebSocket-Key 處理之後的結果。

加入這個 header 的校驗是為了確定對方一定是有 WebSocket 能力的,不然萬一建立了連線對方卻一直沒訊息,那不就白等了麼。

那 Sec-WebSocket-Key 經過什麼處理能得到 Sec-WebSocket-Accept 呢?

我用 node 實現了一下,是這樣的:

const crypto = require('crypto');

function hashKey(key) {
  const sha1 = crypto.createHash('sha1');
  sha1.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
  return sha1.digest('base64');
}
登入後複製

也就是用使用者端傳過來的 key,加上一個固定的字串,經過 sha1 加密之後,轉成 base64 的結果。

這個字串 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 是固定的,不信你搜搜看:

隨便找個有 websocket 的網站,比如知乎就有:

過濾出 ws 型別的請求,看看這幾個 header,是不是就是前面說的那些。

這個 Sec-WebSocket-Key 是 wk60yiym2FEwCAMVZE3FgQ==

而響應的 Sec-WebSocket-Accept 是 XRfPnS+8xl11QWZherej/dkHPHM=

我們算算看:

是不是一毛一樣!

這就是 websocket 升級協定時候的 Sec-WebSocket-Key 對應的 Sec-WebSocket-Accept 的計算過程。

這一步之後就換到 websocket 的協定了,那是一個全新的協定:

勾選 message 這一欄可以看到傳輸的訊息,可以是文字、可以是二進位制:

全新的協定?那具體是什麼樣的協定呢?

這樣的:

大家習慣的 http 協定是 key:value 的 header 帶個 body 的:

它是文字協定,每個 header 都是容易理解的字元。

這樣好懂是好懂,但是傳輸佔的空間太大了。

而 websocket 是二進位制協定,一個位元組可以用來儲存很多資訊:

比如協定的第一個位元組,就儲存了 FIN(結束標誌)、opcode(內容型別是 binary 還是 text) 等資訊。

第二個位元組儲存了 mask(是否有加密),payload(資料長度)。

僅僅兩個位元組,儲存了多少資訊呀!

這就是二進位制協定比文字協定好的地方。

我們看到的 weboscket 的 message 的收發,其實底層都是拼成這樣的格式。

只是瀏覽器幫我們解析了這種格式的協定資料。

這就是 weboscket 的全部流程了。

其實還是挺清晰的,一個切換協定的過程,然後是二進位制的 weboscket 協定的收發。

那我們就用 Node.js 自己實現一個 websocket 伺服器吧!

定義個 MyWebsocket 的 class:

const { EventEmitter } = require('events');
const http = require('http');

class MyWebsocket extends EventEmitter {
  constructor(options) {
    super(options);

    const server = http.createServer();
    server.listen(options.port || 8080);

    server.on('upgrade', (req, socket) => {
      
    });
  }
}
登入後複製

繼承 EventEmitter 是為了可以用 emit 傳送一些事件,外界可以通過 on 監聽這個事件來處理。

我們在建構函式裡建立了一個 http 服務,當 ungrade 事件發生,也就是收到了 Connection: upgrade 的 header 的時候,返回切換協定的 header。

返回的 header 前面已經見過了,就是要對 sec-websocket-key 做下處理。

server.on('upgrade', (req, socket) => {
  this.socket = socket;
  socket.setKeepAlive(true);

  const resHeaders = [
    'HTTP/1.1 101 Switching Protocols',
    'Upgrade: websocket',
    'Connection: Upgrade',
    'Sec-WebSocket-Accept: ' + hashKey(req.headers['sec-websocket-key']),
    '',
    ''
  ].join('\r\n');
  socket.write(resHeaders);

  socket.on('data', (data) => {
    console.log(data)
  });
  socket.on('close', (error) => {
      this.emit('close');
  });
});
登入後複製

我們拿到 socket,返回上面的 header,其中 key 做的處理就是前面聊過的演演算法:

function hashKey(key) {
  const sha1 = crypto.createHash('sha1');
  sha1.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
  return sha1.digest('base64');
}
登入後複製

就這麼簡單,就已經完成協定切換了。

不信我們試試看。

引入我們實現的 ws 伺服器,跑起來:

const MyWebSocket = require('./ws');
const ws = new MyWebSocket({ port: 8080 });

ws.on('data', (data) => {
  console.log('receive data:' + data);
});

ws.on('close', (code, reason) => {
  console.log('close:', code, reason);
});
登入後複製

然後新建這樣一個 html:

<!DOCTYPE HTML>
<html>
<body>
    <script>
        const ws = new WebSocket("ws://localhost:8080");

        ws.onopen = function () {
            ws.send("傳送資料");
            setTimeout(() => {
                ws.send("傳送資料2");
            }, 3000)
        };

        ws.onmessage = function (evt) {
            console.log(evt)
        };

        ws.onclose = function () {
        };
    </script>
</body>

</html>
登入後複製
登入後複製

用瀏覽器的 WebSocket api 建立連線,傳送訊息。

用 npx http-server . 起個靜態服務。

然後瀏覽器存取這個 html:

這時開啟 devtools 你就會發現協定切換成功了:

這 3 個 header 還有 101 狀態碼都是我們返回的。

message 裡也可以看到傳送的訊息:

再去伺服器端看看,也收到了這個訊息:

只不過是 Buffer 的,也就是二進位制的。

接下來只要按照協定格式解析這個 Buffer,並且生成響應格式的協定資料 Buffer 返回就可以收發 websocket 資料了。

這一部分還是比較麻煩的,我們一點點來看。

我們需要第一個位元組的後四位,也就是 opcode。

這樣寫:

const byte1 = bufferData.readUInt8(0);
let opcode = byte1 & 0x0f;
登入後複製

讀取 8 位無符號整數的內容,也就是一個位元組的內容。引數是偏移的位元組,這裡是 0。

通過位運算取出後四位,這就是 opcode 了。

然後再處理第二個位元組:

第一位是 mask 標誌位,後 7 位是 payload 長度。

可以這樣取:

const byte2 = bufferData.readUInt8(1);
const str2 = byte2.toString(2);
const MASK = str2[0];
let payloadLength = parseInt(str2.substring(1), 2);
登入後複製

還是用 buffer.readUInt8 讀取一個位元組的內容。

先轉成二進位制字串,這時第一位就是 mask,然後再擷取後 7 位的子串,parseInt 成數位,這就是 payload 長度了。

這樣前兩個位元組的協定內容就解析完了。

有同學可能問了,後面咋還有倆 payload 長度呢?

這是因為資料不一定有多長,可能需要 16 位存長度,可能需要 32 位。

於是 websocket 協定就規定了如果那個 7 位的內容不超過 125,那它就是 payload 長度。

如果 7 位的內容是 126,那就不用它了,用後面的 16 位的內容作為 payload 長度。

如果 7 位的內容是 127,也不用它了,用後面那個 64 位的內容作為 payload 長度。

其實還是容易理解的,就是 3 個 if else。

用程式碼寫出來就是這樣的:

let payloadLength = parseInt(str2.substring(1), 2);

let curByteIndex = 2;

if (payloadLength === 126) {
  payloadLength = bufferData.readUInt16BE(2);
  curByteIndex += 2;
} else if (payloadLength === 127) {
  payloadLength = bufferData.readBigUInt64BE(2);
  curByteIndex += 8;
}
登入後複製

這裡的 curByteIndex 是儲存當前處理到第幾個位元組的。

如果是 126,那就從第 3 個位元組開始,讀取 2 個位元組也就是 16 位的長度,用 buffer.readUInt16BE 方法。

如果是 127,那就從第 3 個位元組開始,讀取 8 個位元組也就是 64 位的長度,用 buffer.readBigUInt64BE 方法。

這樣就拿到了 payload 的長度,然後再用這個長度去擷取內容就好了。

但在讀取資料之前,還有個 mask 要處理,這個是用來給內容解密的:

讀 4 個位元組,就是 mask key。

再後面的就可以根據 payload 長度讀出來。

let realData = null;

if (MASK) {
  const maskKey = bufferData.slice(curByteIndex, curByteIndex + 4);  
  curByteIndex += 4;
  const payloadData = bufferData.slice(curByteIndex, curByteIndex + payloadLength);
  realData = handleMask(maskKey, payloadData);
} else {
  realData = bufferData.slice(curByteIndex, curByteIndex + payloadLength);;
}
登入後複製

然後用 mask key 來解密資料。

這個演演算法也是固定的,用每個位元組的 mask key 和資料的每一位做按位元互斥或就好了:

function handleMask(maskBytes, data) {
  const payload = Buffer.alloc(data.length);
  for (let i = 0; i < data.length; i++) {
    payload[i] = maskBytes[i % 4] ^ data[i];
  }
  return payload;
}
登入後複製

這樣,我們就拿到了最終的資料!

但是傳給處理程式之前,還要根據型別來處理下,因為內容分幾種型別,也就是 opcode 有幾種值:

const OPCODES = {
  CONTINUE: 0,
  TEXT: 1, // 文字
  BINARY: 2, // 二進位制
  CLOSE: 8,
  PING: 9,
  PONG: 10,
};
登入後複製

我們只處理文字和二進位制就好了:

handleRealData(opcode, realDataBuffer) {
    switch (opcode) {
      case OPCODES.TEXT:
        this.emit('data', realDataBuffer.toString('utf8'));
        break;
      case OPCODES.BINARY:
        this.emit('data', realDataBuffer);
        break;
      default:
        this.emit('close');
        break;
    }
}
登入後複製

文字就轉成 utf-8 的字串,二進位制資料就直接用 buffer 的資料。

這樣,處理程式裡就能拿到解析後的資料。

我們來試一下:

之前我們已經能拿到 weboscket 協定內容的 buffer 了:

而現在我們能正確解析出其中的資料:

至此,我們 websocket 協定的解析成功了!

這樣的協定格式的資料叫做 frame,也就是幀:

解析可以了,接下來我們再實現資料的傳送。

傳送也是構造一樣的 frame 格式。

定義這樣一個 send 方法:

send(data) {
    let opcode;
    let buffer;
    if (Buffer.isBuffer(data)) {
      opcode = OPCODES.BINARY;
      buffer = data;
    } else if (typeof data === 'string') {
      opcode = OPCODES.TEXT;
      buffer = Buffer.from(data, 'utf8');
    } else {
      console.error('暫不支援傳送的資料型別')
    }
    this.doSend(opcode, buffer);
}

doSend(opcode, bufferDatafer) {
   this.socket.write(encodeMessage(opcode, bufferDatafer));
}
登入後複製

根據傳送的是文字還是二進位制資料來對內容作處理。

然後構造 websocket 的 frame:

function encodeMessage(opcode, payload) {
  //payload.length < 126
  let bufferData = Buffer.alloc(payload.length + 2 + 0);;
  
  let byte1 = parseInt('10000000', 2) | opcode; // 設定 FIN 為 1
  let byte2 = payload.length;

  bufferData.writeUInt8(byte1, 0);
  bufferData.writeUInt8(byte2, 1);

  payload.copy(bufferData, 2);
  
  return bufferData;
}
登入後複製

我們只處理資料長度小於 125 的情況。

第一個位元組是 opcode,我們把第一位置 1 ,通過按位元或的方式。

伺服器端給使用者端回訊息不需要 mask,所以第二個位元組就是 payload 長度。

分別把這前兩個位元組的資料寫到 buffer 裡,指定不同的 offset:

bufferData.writeUInt8(byte1, 0);
bufferData.writeUInt8(byte2, 1);
登入後複製

之後把 payload 資料放在後面:

 payload.copy(bufferData, 2);
登入後複製

這樣一個 websocket 的 frame 就構造完了。

我們試一下:

收到使用者端訊息後,每兩秒回一個訊息。

收發訊息都成功了!

就這樣,我們自己實現了一個 websocket 伺服器,實現了 websocket 協定的解析和生成!

完整程式碼如下:

MyWebSocket:

//ws.js
const { EventEmitter } = require('events');
const http = require('http');
const crypto = require('crypto');

function hashKey(key) {
  const sha1 = crypto.createHash('sha1');
  sha1.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
  return sha1.digest('base64');
}

function handleMask(maskBytes, data) {
  const payload = Buffer.alloc(data.length);
  for (let i = 0; i < data.length; i++) {
    payload[i] = maskBytes[i % 4] ^ data[i];
  }
  return payload;
}

const OPCODES = {
  CONTINUE: 0,
  TEXT: 1,
  BINARY: 2,
  CLOSE: 8,
  PING: 9,
  PONG: 10,
};

function encodeMessage(opcode, payload) {
  //payload.length < 126
  let bufferData = Buffer.alloc(payload.length + 2 + 0);;
  
  let byte1 = parseInt('10000000', 2) | opcode; // 設定 FIN 為 1
  let byte2 = payload.length;

  bufferData.writeUInt8(byte1, 0);
  bufferData.writeUInt8(byte2, 1);

  payload.copy(bufferData, 2);
  
  return bufferData;
}

class MyWebsocket extends EventEmitter {
  constructor(options) {
    super(options);

    const server = http.createServer();
    server.listen(options.port || 8080);

    server.on('upgrade', (req, socket) => {
      this.socket = socket;
      socket.setKeepAlive(true);

      const resHeaders = [
        'HTTP/1.1 101 Switching Protocols',
        'Upgrade: websocket',
        'Connection: Upgrade',
        'Sec-WebSocket-Accept: ' + hashKey(req.headers['sec-websocket-key']),
        '',
        ''
      ].join('\r\n');
      socket.write(resHeaders);

      socket.on('data', (data) => {
        this.processData(data);
        // console.log(data);
      });
      socket.on('close', (error) => {
          this.emit('close');
      });
    });
  }

  handleRealData(opcode, realDataBuffer) {
    switch (opcode) {
      case OPCODES.TEXT:
        this.emit('data', realDataBuffer.toString('utf8'));
        break;
      case OPCODES.BINARY:
        this.emit('data', realDataBuffer);
        break;
      default:
        this.emit('close');
        break;
    }
  }

  processData(bufferData) {
    const byte1 = bufferData.readUInt8(0);
    let opcode = byte1 & 0x0f; 
    
    const byte2 = bufferData.readUInt8(1);
    const str2 = byte2.toString(2);
    const MASK = str2[0];

    let curByteIndex = 2;
    
    let payloadLength = parseInt(str2.substring(1), 2);
    if (payloadLength === 126) {
      payloadLength = bufferData.readUInt16BE(2);
      curByteIndex += 2;
    } else if (payloadLength === 127) {
      payloadLength = bufferData.readBigUInt64BE(2);
      curByteIndex += 8;
    }

    let realData = null;
    
    if (MASK) {
      const maskKey = bufferData.slice(curByteIndex, curByteIndex + 4);  
      curByteIndex += 4;
      const payloadData = bufferData.slice(curByteIndex, curByteIndex + payloadLength);
      realData = handleMask(maskKey, payloadData);
    } 
    
    this.handleRealData(opcode, realData);
  }

  send(data) {
    let opcode;
    let buffer;
    if (Buffer.isBuffer(data)) {
      opcode = OPCODES.BINARY;
      buffer = data;
    } else if (typeof data === 'string') {
      opcode = OPCODES.TEXT;
      buffer = Buffer.from(data, 'utf8');
    } else {
      console.error('暫不支援傳送的資料型別')
    }
    this.doSend(opcode, buffer);
  }

  doSend(opcode, bufferDatafer) {
    this.socket.write(encodeMessage(opcode, bufferDatafer));
  }
}

module.exports = MyWebsocket;
登入後複製

Index:

const MyWebSocket = require('./ws');
const ws = new MyWebSocket({ port: 8080 });

ws.on('data', (data) => {
  console.log('receive data:' + data);
  setInterval(() => {
    ws.send(data + ' ' + Date.now());
  }, 2000)
});

ws.on('close', (code, reason) => {
  console.log('close:', code, reason);
});
登入後複製

html:

<!DOCTYPE HTML>
<html>
<body>
    <script>
        const ws = new WebSocket("ws://localhost:8080");

        ws.onopen = function () {
            ws.send("傳送資料");
            setTimeout(() => {
                ws.send("傳送資料2");
            }, 3000)
        };

        ws.onmessage = function (evt) {
            console.log(evt)
        };

        ws.onclose = function () {
        };
    </script>
</body>

</html>
登入後複製
登入後複製

總結

實時性較高的需求,我們會用 websocket 實現,比如即時通訊、遊戲等場景。

websocket 和 http 沒什麼關係,但從 http 到 websocket 需要一次切換的過程。

這個切換過程除了要帶 upgrade 的 header 外,還要帶 sec-websocket-key,伺服器端根據這個 key 算出結果,通過 sec-websocket-accept 返回。響應是 101 Switching Protocols 的狀態碼。

這個計算過程比較固定,就是 key + 固定的字串 通過 sha1 加密後再 base64 的結果。

加這個機制是為了確保對方一定是 websocket 伺服器,而不是隨意返回了個 101 狀態碼。

之後就是 websocket 協定了,這是個二進位制協定,我們根據格式完成了 websocket 幀的解析和生成。

這樣就是一個完整的 websocket 協定的實現了。

我們自己手寫了一個 websocket 服務,有沒有感覺對 websocket 的理解更深了呢?

更多node相關知識,請存取:!

以上就是手把手帶你用Node手寫WebSocket協定的詳細內容,更多請關注TW511.COM其它相關文章!