隨著現代技術的快速發展,即時互動變得越來越重要。使用者不僅希望獲取資訊,而且希望以更直觀和實時的方式體驗它。這在聊天應用程式和其他實時通訊工具中尤為明顯,使用者習慣看到對方正在輸入的提示。
ChatGPT,作為 OpenAI 的代表性產品之一,不僅為使用者提供了強大的自然語言處理能力,而且關注使用者的整體互動體驗。在使用 ChatGPT 進行互動時,使用者可能已經注意到了一個細節:當它產生回覆時,回覆會像人類逐字輸入的方式逐漸出現,而不是一次性顯示完整答案。
這種打字效果給人一種彷彿與真人對話的感覺,進一步增強了其自然語言處理的真實感。一開始,許多開發者可能會誤以為這是通過 WebSockets 實現的,這是因為 WebSockets 是一種常用於實時通訊的技術。然而,仔細研究後,我們發現 ChatGPT 使用了一種不同的技術:基於 EventStream 的方法。更具體地說,它似乎是通過 SSE (Server-Sent Events) 來實現逐個字地推播答案的。
此外,考慮到 ChatGPT 的複雜性和其涉及的大量計算,響應時間可能會長於其他基於資料庫的簡單查詢。因此,採用 SSE 逐步推播結果的方式可以幫助減少使用者感到的等待時間,從而增強使用者體驗。
Server-Sent Events(通常簡稱為SSE)是一種允許伺服器向Web頁面傳送實時更新的技術。與WebSocket技術相比,SSE專門設計用於從伺服器到使用者端的單向通訊。這種單向性使其在某些場景中更為簡單和直觀。
單向通訊:SSE 專為從伺服器到使用者端的單向通訊設計。使用者端不能通過SSE直接傳送資料到伺服器,但可以通過其他方法如AJAX與伺服器進行互動。
基於HTTP:SSE 基於 HTTP 協定執行,不需要新的協定或埠。這使得它能夠輕鬆地在現有的Web應用架構中使用,並且通過標準的HTTP代理和中介軟體進行支援。
自動重連:如果連線斷開,瀏覽器會自動嘗試重新連線到伺服器。
格式簡單:SSE 使用簡單的文字格式傳送訊息,每個訊息都以兩個連續的換行符分隔。
原生瀏覽器支援:許多現代瀏覽器(如 Chrome、Firefox 和 Safari)已原生支援SSE,但需要注意的是,某些瀏覽器,如Internet Explorer和早期的Edge版本,不支援SSE。
雖然 SSE 與 WebSockets 在某種程度上有些相似,但它們之間還存在一些關鍵差異,如下所示:
對比項 | Server-Sent Events (SSE) | WebSockets |
---|---|---|
基於協定 | 基於 HTTP,簡化了連線和互動的過程 | 通常基於 WS/WSS(基於TCP),更為靈活 |
通訊能力 | 單向通訊:僅伺服器向用戶端傳送訊息 | 雙向通訊能力 |
設定 | 設定簡單,易於理解和使用 | 需要更復雜的設定和理解 |
斷線與訊息追蹤 | 自帶的斷線重連和訊息跟蹤功能 | 通常需要手動處理或使用額外庫 |
資料格式 | 通常為文字,但可以傳送經過編碼/壓縮的二進位制訊息 | 支援文字和原始二進位制訊息 |
事件處理 | 支援多種自定義事件 | 基本訊息機制,不能像SSE那樣自定義事件型別 |
連線並行性 | 連線數可能受到 HTTP 版本的限制,尤其是在HTTP/1.1中 | WebSocket被設計為支援更高的連線並行性 |
安全性 | 僅支援HTTP和HTTPS的安全機制 | 支援WS和WSS,可以在WSS上實現更強大的加密 |
瀏覽器相容性 | 大部分現代瀏覽器支援,但不是所有瀏覽器 | 幾乎所有現代瀏覽器都支援 |
開銷 | 由於基於HTTP,每次訊息可能有較大的頭部開銷 | 握手後,訊息頭部開銷相對較小 |
Server-Sent Events(SSE)是一個基於 HTTP 的協定,允許伺服器單向地向瀏覽器推播資訊。為了成功地使用 SSE,伺服器和使用者端都必須遵循一定的規範和流程。
當用戶端(例如瀏覽器)發出請求訂閱 SSE 服務時,伺服器需要通過設定特定的響應頭部資訊來確認該請求。這些頭部資訊包括:
Content-Type: text/event-stream
: 這表示返回的內容為事件流。
Cache-Control: no-cache
: 這確保伺服器推播的訊息不會被快取,以保障訊息的實時性。
Connection: keep-alive
: 這指示連線應始終保持開放,以便伺服器可以隨時傳送訊息。
SSE 使用簡單的文字格式來組織和傳送訊息。基本的訊息結構是由一系列行組成,每一行由欄位名、一個冒號和欄位值組成。
以下是訊息中可以使用的一些欄位及其用途:
event
: 定義了事件的型別。這可以幫助使用者端確定如何處理接收到的訊息。
id
: 提供事件的唯一識別符號。如果連線中斷,使用者端可以使用最後收到的事件 ID 來請求伺服器從某個點重新傳送訊息。
retry
: 指定了當連線斷開時,使用者端應等待多少毫秒再嘗試重新連線。這為連線中斷和重連提供了一種機制。
data
: 這是訊息的主體內容。它可以是任何 UTF-8 編碼的文字,而且可以跨多行。每行資料都會在使用者端解析時連線起來,中間使用換行符分隔。
為了確保訊息的正確和完整傳輸,伺服器通常在訊息的末尾新增一個空行,表示訊息的結束。
範例:
id: 123
event: update
data: {"message": "This is a test message"}
此外,SSE 也支援多條連續訊息的傳送。只要每條訊息之間使用兩個換行符隔開即可。
接入 SSE 並不困難,尤其在使用者端這邊。主流瀏覽器提供了EventSource
API,使得與 SSE 伺服器端建立和維護連線變得異常簡單。
首先,需要建立一個EventSource
物件,它將代表與伺服器的持久連線。初始化時,可以為它提供一些選項,以滿足特定需求。
const options = {
withCredentials: true // 允許跨域請求攜帶憑證
};
// 建立一個 EventSource 物件以開始監聽
const eventSource = new EventSource('your_server_url', options);
在上面的程式碼中,withCredentials
引數用於指示是否應該在請求中傳送憑證(例如 cookies)。這在跨域場景中可能會非常有用。
一旦與伺服器建立了連線,就可以開始監聽從伺服器傳送過來的事件。
通用事件處理:
預設情況下,EventSource
物件會對三種基本的事件型別進行響應:open
、message
和error
。可以設定對應的處理常式來對它們進行響應。
// 監聽連線開啟事件
eventSource.onopen = function(event) {
console.log('Connection to SSE server established!');
};
// 監聽標準訊息事件
eventSource.onmessage = function(event) {
console.log('Received data from server: ', event.data);
};
// 監聽錯誤事件
eventSource.onerror = function(event) {
console.error('An error occurred while receiving data:', event);
};
自定義事件處理:
除了上述的基本事件外,伺服器還可能傳送自定義的事件型別。為了處理這些事件,需要使用addEventListener()
方法。
// 監聽一個名為 "update" 的自定義事件
eventSource.addEventListener('update', function(event) {
console.log('Received update event:', event.data);
});
如果不再需要從伺服器接收事件,可以使用close
方法關閉連線。
eventSource.close();
關閉連線後,將不再接收任何事件,除非再次初始化EventSource
物件。
總結:使用EventSource
API,使用者端可以方便地與 SSE 伺服器互動,從而實時接收資料更新。這為建立響應迅速的 web 應用提供了極大的便利,同時避免了傳統的輪詢方式帶來的資源浪費。
const http = require('http');
const fs = require('fs');
// 初始化 HTTP 伺服器
http.createServer((req, res) => {
// 為了簡潔,將響應方法抽離成函數
function serveFile(filePath, contentType) {
fs.readFile(filePath, (err, data) => {
if (err) {
res.writeHead(500);
res.end('Error loading the file');
} else {
res.writeHead(200, {'Content-Type': contentType});
res.end(data);
}
});
}
function handleSSEConnection() {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
let id = 0;
const intervalId = setInterval(() => {
const message = {
event: 'customEvent',
id: id++,
retry: 30000,
data: { id, time: new Date().toISOString() }
};
for (let key in message) {
if (key !== 'data') {
res.write(`${key}: ${message[key]}\n`);
} else {
res.write(`data: ${JSON.stringify(message.data)}\n\n`);
}
}
}, 1000);
req.on('close', () => {
clearInterval(intervalId);
res.end();
});
}
switch (req.url) {
case '/':
serveFile('index.html', 'text/html');
break;
case '/events':
handleSSEConnection();
break;
default:
res.writeHead(404);
res.end();
break;
}
}).listen(3000);
console.log('Server listening on port 3000');
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE Demo</title>
</head>
<body>
<h1>SSE Demo</h1>
<button onclick="connectSSE()">建立 SSE 連線</button>
<button onclick="closeSSE()">斷開 SSE 連線</button>
<br /><br />
<div id="message"></div>
<script>
const messageElement = document.getElementById('message');
let eventSource;
// 連線 SSE
function connectSSE() {
eventSource = new EventSource('/events');
eventSource.addEventListener('customEvent', handleReceivedMessage);
eventSource.onopen = handleConnectionOpen;
eventSource.onerror = handleConnectionError;
}
// 斷開 SSE 連線
function closeSSE() {
eventSource.close();
appendMessage(`SSE 連線關閉,狀態${eventSource.readyState}`);
}
// 處理從伺服器端收到的訊息
function handleReceivedMessage(event) {
const data = JSON.parse(event.data);
appendMessage(`${data.id} --- ${data.time}`);
}
// 連線建立成功的處理常式
function handleConnectionOpen() {
appendMessage(`SSE 連線成功,狀態${eventSource.readyState}`);
}
// 連線發生錯誤的處理常式
function handleConnectionError() {
appendMessage(`SSE 連線錯誤,狀態${eventSource.readyState}`);
}
// 將訊息新增到頁面上
function appendMessage(message) {
messageElement.innerHTML += `${message}<br />`;
}
</script>
</body>
</html>
將上面的兩份程式碼儲存為server.js
和index.html
,並在命令列中執行node server.js
啟動伺服器端,然後在瀏覽器中開啟http://localhost:3000
即可看到 SSE 效果。
在業務真實使用場景中,基於SSE的方法存在一些問題和限制:
預設請求僅支援GET
方法。當前端需要向後端傳遞引數時,引數只能拼接在請求的 URL 上,對於複雜的業務場景來說實現較為麻煩。
對於伺服器端返回的資料格式有固定要求,必須按照event
、id
、retry
、data
的結構返回。
伺服器端傳送的資料可以在瀏覽器控制檯中檢視,這可能會暴露敏感資料,導致資料安全問題。
為了解決以上問題,並使其支援POST
請求以及自定義的返回資料格式,我們可以使用以下技巧
利用 Fetch API 的流處理能力,我們可以實現對 SSE 的擴充套件:
/**
* Utf8ArrayToStr: 將Uint8Array的資料轉為字串
* @param {Uint8Array} array - Uint8Array資料
* @return {string} - 轉換後的字串
*/
function Utf8ArrayToStr(array) {
const decoder = new TextDecoder();
return decoder.decode(array);
}
/**
* fetchStream: 建立一個SSE連線,並支援多種HTTP請求方式
* @param {string} url - 請求的URL地址
* @param {object} params - 請求的引數,包括HTTP方法、頭部、主體內容等
* @return {Promise} - 返回一個Promise物件
*/
const fetchStream = (url, params) => {
const { onmessage, onclose, ...otherParams } = params;
return fetch(url, otherParams)
.then(response => {
let reader = response.body?.getReader();
return new ReadableStream({
start(controller) {
function push() {
reader?.read().then(({ done, value }) => {
if (done) {
controller.close();
onclose?.();
return;
}
const decodedData = Utf8ArrayToStr(value);
console.log(decodedData);
onmessage?.(decodedData);
controller.enqueue(value);
push();
});
}
push();
}
});
})
.then(stream => {
return new Response(stream, {
headers: { "Content-Type": "text/html" }
}).text();
});
};
// 範例:呼叫fetchStream函數
fetchStream("/events", {
method: "POST", // 使用POST方法
headers: {
"content-type": "application/json"
},
credentials: "include",
body: JSON.stringify({
// 這裡列出了一些範例資料,實際業務場景請替換為你的資料
boxId: "exampleBoxId",
sessionId: "exampleSessionId",
queryContent: "exampleQueryContent"
}),
onmessage: res => {
console.log(res); // 當接收到訊息時的回撥
},
onclose: () => {
console.log("Connection closed."); // 當連線關閉時的回撥
}
});
我們定義一個名為eventStreamHandler.ts
的檔案
// 定義請求主體的介面,需要根據具體的應用場景定義具體的屬性
interface RequestBody {
// 範例屬性,具體屬性需要根據實際需求定義
key?: string;
}
// 錯誤響應的結構
interface ErrorResponse {
error: string;
detail: string;
}
// 返回值型別定義
type TextStream = ReadableStreamDefaultReader<Uint8Array>;
// 獲取資料並返回TextStream
async function fetchData(
url: string,
body: RequestBody,
accessToken: string,
onError: (message: string) => void
): Promise<TextStream | undefined> {
try {
// 嘗試發起請求
const response = await fetch(url, {
method: "POST",
cache: "no-cache",
keepalive: true,
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
Authorization: `Bearer ${accessToken}`,
},
body: JSON.stringify(body),
});
// 檢查是否有衝突,例如重複請求
if (response.status === 409) {
const error: ErrorResponse = await response.json();
onError(error.detail);
return undefined;
}
return response.body?.getReader();
} catch (error) {
onError(`Failed to fetch: ${error.message}`);
return undefined;
}
}
// 讀取流資料
async function readStream(reader: TextStream): Promise<string | null> {
const result = await reader.read();
return result.done ? null : new TextDecoder().decode(result.value);
}
// 處理文字流資料
async function processStream(
reader: TextStream,
onStart: () => void,
onText: (text: string) => void,
onError: (error: string) => void,
shouldClose: () => boolean
): Promise<void> {
try {
// 開始處理資料
onStart();
while (true) {
if (shouldClose()) {
await reader.cancel();
return;
}
const text = await readStream(reader);
if (text === null) break;
onText(text);
}
} catch (error) {
onError(`Processing stream failed: ${error.message}`);
}
}
/**
* 主要的匯出函數,用於處理流式文字資料。
*
* @param url 請求的URL。
* @param body 請求主體內容。
* @param accessToken 存取令牌。
* @param onStart 開始處理資料時的回撥。
* @param onText 接收到資料時的回撥。
* @param onError 錯誤處理回撥。
* @param shouldClose 判斷是否需要關閉流的函數。
*/
export async function streamText(
url: string,
body: RequestBody,
accessToken: string,
onStart: () => void,
onText: (text: string) => void,
onError: (error: string) => void,
shouldClose: () => boolean
): Promise<void> {
const reader = await fetchData(url, body, accessToken, onError);
if (!reader) {
console.error("Reader is undefined!");
return;
}
await processStream(reader, onStart, onText, onError, shouldClose);
}
發展至今,SSE 已具有廣泛的的瀏覽器相容性,幾乎除 IE 之外的瀏覽器均已支援。
SSE (Server-Sent Events) 是基於 HTTP 協定的輕量級實時通訊技術。其核心特點是由伺服器主動推播資料到使用者端,而不需要使用者端頻繁請求。這樣的特點使得 SSE 在某些應用場景中成為了理想選擇,例如股票行情實時更新、網站活動紀錄檔推播、或聊天室中的實時線上人數統計。
然而,儘管 SSE 有很多優勢,如斷線重連機制、相對簡單的實現和輕量性等,但它也存在明顯的侷限性。首先,SSE 只支援單向通訊,即伺服器到使用者端的資料推播,而無法實現真正的雙向互動。其次,由於瀏覽器對並行連線數有限制,當需要大量的實時通訊連線時,SSE 可能會受到限制。
相對而言,WebSockets 提供了一個更加強大的雙向通訊機制,能夠滿足高並行、高吞吐量和低延遲的需求。因此,在選擇適合的實時通訊方案時,開發者需要根據應用的具體需求和場景來做出選擇。簡而言之,對於需要簡單、低頻率更新的場景,SSE 是一個非常不錯的選擇;而對於需要複雜、高頻、雙向互動的應用,WebSockets 可能更為合適。
最後,無論選擇哪種技術,都應對其優缺點有深入瞭解,以確保在特定場景下可以提供最佳的使用者體驗。
作者:京東科技 卞榮成
來源:京東雲開發者社群 轉載請註明來源