大模型問答助手前端實現打字機效果

2023-10-30 12:01:52

1. 背景

隨著現代技術的快速發展,即時互動變得越來越重要。使用者不僅希望獲取資訊,而且希望以更直觀和實時的方式體驗它。這在聊天應用程式和其他實時通訊工具中尤為明顯,使用者習慣看到對方正在輸入的提示。

ChatGPT,作為 OpenAI 的代表性產品之一,不僅為使用者提供了強大的自然語言處理能力,而且關注使用者的整體互動體驗。在使用 ChatGPT 進行互動時,使用者可能已經注意到了一個細節:當它產生回覆時,回覆會像人類逐字輸入的方式逐漸出現,而不是一次性顯示完整答案。

這種打字效果給人一種彷彿與真人對話的感覺,進一步增強了其自然語言處理的真實感。一開始,許多開發者可能會誤以為這是通過 WebSockets 實現的,這是因為 WebSockets 是一種常用於實時通訊的技術。然而,仔細研究後,我們發現 ChatGPT 使用了一種不同的技術:基於 EventStream 的方法。更具體地說,它似乎是通過 SSE (Server-Sent Events) 來實現逐個字地推播答案的。

此外,考慮到 ChatGPT 的複雜性和其涉及的大量計算,響應時間可能會長於其他基於資料庫的簡單查詢。因此,採用 SSE 逐步推播結果的方式可以幫助減少使用者感到的等待時間,從而增強使用者體驗。

2. SSE 簡介

Server-Sent Events(通常簡稱為SSE)是一種允許伺服器向Web頁面傳送實時更新的技術。與WebSocket技術相比,SSE專門設計用於從伺服器到使用者端的單向通訊。這種單向性使其在某些場景中更為簡單和直觀。

2.1 主要特點

  1. 單向通訊:SSE 專為從伺服器到使用者端的單向通訊設計。使用者端不能通過SSE直接傳送資料到伺服器,但可以通過其他方法如AJAX與伺服器進行互動。

  2. 基於HTTP:SSE 基於 HTTP 協定執行,不需要新的協定或埠。這使得它能夠輕鬆地在現有的Web應用架構中使用,並且通過標準的HTTP代理和中介軟體進行支援。

  3. 自動重連:如果連線斷開,瀏覽器會自動嘗試重新連線到伺服器。

  4. 格式簡單:SSE 使用簡單的文字格式傳送訊息,每個訊息都以兩個連續的換行符分隔。

  5. 原生瀏覽器支援:許多現代瀏覽器(如 Chrome、Firefox 和 Safari)已原生支援SSE,但需要注意的是,某些瀏覽器,如Internet Explorer和早期的Edge版本,不支援SSE。

2.2 SSE 與 WebSockets

雖然 SSE 與 WebSockets 在某種程度上有些相似,但它們之間還存在一些關鍵差異,如下所示:

對比項 Server-Sent Events (SSE) WebSockets
基於協定 基於 HTTP,簡化了連線和互動的過程 通常基於 WS/WSS(基於TCP),更為靈活
通訊能力 單向通訊:僅伺服器向用戶端傳送訊息 雙向通訊能力
設定 設定簡單,易於理解和使用 需要更復雜的設定和理解
斷線與訊息追蹤 自帶的斷線重連和訊息跟蹤功能 通常需要手動處理或使用額外庫
資料格式 通常為文字,但可以傳送經過編碼/壓縮的二進位制訊息 支援文字和原始二進位制訊息
事件處理 支援多種自定義事件 基本訊息機制,不能像SSE那樣自定義事件型別
連線並行性 連線數可能受到 HTTP 版本的限制,尤其是在HTTP/1.1中 WebSocket被設計為支援更高的連線並行性
安全性 僅支援HTTP和HTTPS的安全機制 支援WS和WSS,可以在WSS上實現更強大的加密
瀏覽器相容性 大部分現代瀏覽器支援,但不是所有瀏覽器 幾乎所有現代瀏覽器都支援
開銷 由於基於HTTP,每次訊息可能有較大的頭部開銷 握手後,訊息頭部開銷相對較小

3. 伺服器端深入解析

3.1 SSE 的協定機制

Server-Sent Events(SSE)是一個基於 HTTP 的協定,允許伺服器單向地向瀏覽器推播資訊。為了成功地使用 SSE,伺服器和使用者端都必須遵循一定的規範和流程。

當用戶端(例如瀏覽器)發出請求訂閱 SSE 服務時,伺服器需要通過設定特定的響應頭部資訊來確認該請求。這些頭部資訊包括:

  • Content-Type: text/event-stream: 這表示返回的內容為事件流。

  • Cache-Control: no-cache: 這確保伺服器推播的訊息不會被快取,以保障訊息的實時性。

  • Connection: keep-alive: 這指示連線應始終保持開放,以便伺服器可以隨時傳送訊息。

3.2 訊息的格式和結構

SSE 使用簡單的文字格式來組織和傳送訊息。基本的訊息結構是由一系列行組成,每一行由欄位名、一個冒號和欄位值組成。

以下是訊息中可以使用的一些欄位及其用途:

  • event: 定義了事件的型別。這可以幫助使用者端確定如何處理接收到的訊息。

  • id: 提供事件的唯一識別符號。如果連線中斷,使用者端可以使用最後收到的事件 ID 來請求伺服器從某個點重新傳送訊息。

  • retry: 指定了當連線斷開時,使用者端應等待多少毫秒再嘗試重新連線。這為連線中斷和重連提供了一種機制。

  • data: 這是訊息的主體內容。它可以是任何 UTF-8 編碼的文字,而且可以跨多行。每行資料都會在使用者端解析時連線起來,中間使用換行符分隔。

為了確保訊息的正確和完整傳輸,伺服器通常在訊息的末尾新增一個空行,表示訊息的結束。

範例:

id: 123
event: update
data: {"message": "This is a test message"}


此外,SSE 也支援多條連續訊息的傳送。只要每條訊息之間使用兩個換行符隔開即可。

4. 使用者端實踐

接入 SSE 並不困難,尤其在使用者端這邊。主流瀏覽器提供了EventSourceAPI,使得與 SSE 伺服器端建立和維護連線變得異常簡單。

4.1 如何建立連線

首先,需要建立一個EventSource物件,它將代表與伺服器的持久連線。初始化時,可以為它提供一些選項,以滿足特定需求。

const options = {
  withCredentials: true  // 允許跨域請求攜帶憑證
};

// 建立一個 EventSource 物件以開始監聽
const eventSource = new EventSource('your_server_url', options);


在上面的程式碼中,withCredentials引數用於指示是否應該在請求中傳送憑證(例如 cookies)。這在跨域場景中可能會非常有用。

4.2 如何處理收到的事件

一旦與伺服器建立了連線,就可以開始監聽從伺服器傳送過來的事件。

  • 通用事件處理:
    預設情況下,EventSource物件會對三種基本的事件型別進行響應:openmessageerror。可以設定對應的處理常式來對它們進行響應。

    // 監聽連線開啟事件
    eventSource.onopen = function(event) {
      console.log('Connection to SSE server established!');
    };
    
    // 監聽標準訊息事件
    eventSource.onmessage = function(event) {
      console.log('Received data from server: ', event.data);
    };
    
    // 監聽錯誤事件
    eventSource.onerror = function(event) {
      console.error('An error occurred while receiving data:', event);
    };
    
    
    
  • 自定義事件處理:
    除了上述的基本事件外,伺服器還可能傳送自定義的事件型別。為了處理這些事件,需要使用addEventListener()方法。

    // 監聽一個名為 "update" 的自定義事件
    eventSource.addEventListener('update', function(event) {
      console.log('Received update event:', event.data);
    });
    
    
    

4.3 關閉連線

如果不再需要從伺服器接收事件,可以使用close方法關閉連線。

eventSource.close();


關閉連線後,將不再接收任何事件,除非再次初始化EventSource物件。


總結:使用EventSourceAPI,使用者端可以方便地與 SSE 伺服器互動,從而實時接收資料更新。這為建立響應迅速的 web 應用提供了極大的便利,同時避免了傳統的輪詢方式帶來的資源浪費。

5. 理論實踐

5.1 伺服器端

const http = require('http');
const fs = require('fs');

// 初始化 HTTP 伺服器
http.createServer((req, res) => {

  // 為了簡潔,將響應方法抽離成函數
  function serveFile(filePath, contentType) {
    fs.readFile(filePath, (err, data) => {
      if (err) {
        res.writeHead(500);
        res.end('Error loading the file');
      } else {
        res.writeHead(200, {'Content-Type': contentType});
        res.end(data);
      }
    });
  }

  function handleSSEConnection() {
    res.writeHead(200, { 
      'Content-Type': 'text/event-stream', 
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive'
    });

    let id = 0;
    const intervalId = setInterval(() => {
      const message = {
        event: 'customEvent',
        id: id++,
        retry: 30000,
        data: { id, time: new Date().toISOString() }
      };
      for (let key in message) {
        if (key !== 'data') {
          res.write(`${key}: ${message[key]}\n`);
        } else {
          res.write(`data: ${JSON.stringify(message.data)}\n\n`);
        }
      }
    }, 1000);

    req.on('close', () => {
      clearInterval(intervalId);
      res.end();
    });
  }

  switch (req.url) {
    case '/':
      serveFile('index.html', 'text/html');
      break;
    case '/events':
      handleSSEConnection();
      break;
    default:
      res.writeHead(404);
      res.end();
      break;
  }

}).listen(3000);

console.log('Server listening on port 3000');


5.2 使用者端

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta http-equiv="X-UA-Compatible" content="IE=edge">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>SSE Demo</title>
</head>
<body>
  <h1>SSE Demo</h1>
  <button onclick="connectSSE()">建立 SSE 連線</button>
  <button onclick="closeSSE()">斷開 SSE 連線</button> 
  <br /><br />
  <div id="message"></div>

  <script>
    const messageElement = document.getElementById('message');
    let eventSource;

    // 連線 SSE
    function connectSSE() {
      eventSource = new EventSource('/events');

      eventSource.addEventListener('customEvent', handleReceivedMessage);
      eventSource.onopen = handleConnectionOpen;
      eventSource.onerror = handleConnectionError;
    }

    // 斷開 SSE 連線
    function closeSSE() {
      eventSource.close();
      appendMessage(`SSE 連線關閉,狀態${eventSource.readyState}`);
    }

    // 處理從伺服器端收到的訊息
    function handleReceivedMessage(event) {
      const data = JSON.parse(event.data);
      appendMessage(`${data.id} --- ${data.time}`);
    }

    // 連線建立成功的處理常式
    function handleConnectionOpen() {
      appendMessage(`SSE 連線成功,狀態${eventSource.readyState}`);
    }

    // 連線發生錯誤的處理常式
    function handleConnectionError() {
      appendMessage(`SSE 連線錯誤,狀態${eventSource.readyState}`);
    }

    // 將訊息新增到頁面上
    function appendMessage(message) {
      messageElement.innerHTML += `${message}<br />`;
    }
  </script>
</body>
</html>


將上面的兩份程式碼儲存為server.jsindex.html,並在命令列中執行node server.js啟動伺服器端,然後在瀏覽器中開啟http://localhost:3000即可看到 SSE 效果。

6. 業務實踐

6.1 存在問題

在業務真實使用場景中,基於SSE的方法存在一些問題和限制:

  1. 預設請求僅支援GET方法。當前端需要向後端傳遞引數時,引數只能拼接在請求的 URL 上,對於複雜的業務場景來說實現較為麻煩。

  2. 對於伺服器端返回的資料格式有固定要求,必須按照eventidretrydata的結構返回。

  3. 伺服器端傳送的資料可以在瀏覽器控制檯中檢視,這可能會暴露敏感資料,導致資料安全問題。

為了解決以上問題,並使其支援POST請求以及自定義的返回資料格式,我們可以使用以下技巧

6.2 優化技巧

利用 Fetch API 的流處理能力,我們可以實現對 SSE 的擴充套件:

/**
 * Utf8ArrayToStr: 將Uint8Array的資料轉為字串
 * @param {Uint8Array} array - Uint8Array資料
 * @return {string} - 轉換後的字串
 */
function Utf8ArrayToStr(array) {
    const decoder = new TextDecoder();
    return decoder.decode(array);
}

/**
 * fetchStream: 建立一個SSE連線,並支援多種HTTP請求方式
 * @param {string} url - 請求的URL地址
 * @param {object} params - 請求的引數,包括HTTP方法、頭部、主體內容等
 * @return {Promise} - 返回一個Promise物件
 */
const fetchStream = (url, params) => {
    const { onmessage, onclose, ...otherParams } = params;

    return fetch(url, otherParams)
        .then(response => {
            let reader = response.body?.getReader();

            return new ReadableStream({
                start(controller) {
                    function push() {
                        reader?.read().then(({ done, value }) => {
                            if (done) {
                                controller.close();
                                onclose?.();
                                return;
                            }
                            const decodedData = Utf8ArrayToStr(value);
                            console.log(decodedData);

                            onmessage?.(decodedData);

                            controller.enqueue(value);

                            push();
                        });
                    }
                    push();
                }
            });
        })
        .then(stream => {
            return new Response(stream, {
                headers: { "Content-Type": "text/html" }
            }).text();
        });
};

// 範例:呼叫fetchStream函數
fetchStream("/events", {
    method: "POST", // 使用POST方法
    headers: {
        "content-type": "application/json"
    },
    credentials: "include",
    body: JSON.stringify({
        // 這裡列出了一些範例資料,實際業務場景請替換為你的資料
        boxId: "exampleBoxId",
        sessionId: "exampleSessionId",
        queryContent: "exampleQueryContent"
    }),
    onmessage: res => {
        console.log(res); // 當接收到訊息時的回撥
    },
    onclose: () => {
        console.log("Connection closed."); // 當連線關閉時的回撥
    }
});



6.3 封裝外掛

我們定義一個名為eventStreamHandler.ts的檔案

// 定義請求主體的介面,需要根據具體的應用場景定義具體的屬性
interface RequestBody {
    // 範例屬性,具體屬性需要根據實際需求定義
    key?: string;
}

// 錯誤響應的結構
interface ErrorResponse {
    error: string;
    detail: string;
}

// 返回值型別定義
type TextStream = ReadableStreamDefaultReader<Uint8Array>;

// 獲取資料並返回TextStream
async function fetchData(
    url: string,
    body: RequestBody,
    accessToken: string,
    onError: (message: string) => void
): Promise<TextStream | undefined> {
    try {
        // 嘗試發起請求
        const response = await fetch(url, {
            method: "POST",
            cache: "no-cache",
            keepalive: true,
            headers: {
                "Content-Type": "application/json",
                Accept: "text/event-stream",
                Authorization: `Bearer ${accessToken}`,
            },
            body: JSON.stringify(body),
        });

        // 檢查是否有衝突,例如重複請求
        if (response.status === 409) {
            const error: ErrorResponse = await response.json();
            onError(error.detail);
            return undefined;
        }

        return response.body?.getReader();
    } catch (error) {
        onError(`Failed to fetch: ${error.message}`);
        return undefined;
    }
}

// 讀取流資料
async function readStream(reader: TextStream): Promise<string | null> {
    const result = await reader.read();
    return result.done ? null : new TextDecoder().decode(result.value);
}

// 處理文字流資料
async function processStream(
    reader: TextStream,
    onStart: () => void,
    onText: (text: string) => void,
    onError: (error: string) => void,
    shouldClose: () => boolean
): Promise<void> {
    try {
        // 開始處理資料
        onStart();
        
        while (true) {
            if (shouldClose()) {
                await reader.cancel();
                return;
            }
            const text = await readStream(reader);
            if (text === null) break;

            onText(text);
        }
    } catch (error) {
        onError(`Processing stream failed: ${error.message}`);
    }
}

/**
 * 主要的匯出函數,用於處理流式文字資料。
 * 
 * @param url 請求的URL。
 * @param body 請求主體內容。
 * @param accessToken 存取令牌。
 * @param onStart 開始處理資料時的回撥。
 * @param onText 接收到資料時的回撥。
 * @param onError 錯誤處理回撥。
 * @param shouldClose 判斷是否需要關閉流的函數。
 */
export async function streamText(
    url: string,
    body: RequestBody,
    accessToken: string,
    onStart: () => void,
    onText: (text: string) => void,
    onError: (error: string) => void,
    shouldClose: () => boolean
): Promise<void> {
    const reader = await fetchData(url, body, accessToken, onError);
    
    if (!reader) {
        console.error("Reader is undefined!");
        return;
    }

    await processStream(reader, onStart, onText, onError, shouldClose);
}


7. 相容性

發展至今,SSE 已具有廣泛的的瀏覽器相容性,幾乎除 IE 之外的瀏覽器均已支援。

8. 總結

SSE (Server-Sent Events) 是基於 HTTP 協定的輕量級實時通訊技術。其核心特點是由伺服器主動推播資料到使用者端,而不需要使用者端頻繁請求。這樣的特點使得 SSE 在某些應用場景中成為了理想選擇,例如股票行情實時更新、網站活動紀錄檔推播、或聊天室中的實時線上人數統計。

然而,儘管 SSE 有很多優勢,如斷線重連機制、相對簡單的實現和輕量性等,但它也存在明顯的侷限性。首先,SSE 只支援單向通訊,即伺服器到使用者端的資料推播,而無法實現真正的雙向互動。其次,由於瀏覽器對並行連線數有限制,當需要大量的實時通訊連線時,SSE 可能會受到限制。

相對而言,WebSockets 提供了一個更加強大的雙向通訊機制,能夠滿足高並行、高吞吐量和低延遲的需求。因此,在選擇適合的實時通訊方案時,開發者需要根據應用的具體需求和場景來做出選擇。簡而言之,對於需要簡單、低頻率更新的場景,SSE 是一個非常不錯的選擇;而對於需要複雜、高頻、雙向互動的應用,WebSockets 可能更為合適。

最後,無論選擇哪種技術,都應對其優缺點有深入瞭解,以確保在特定場景下可以提供最佳的使用者體驗。

作者:京東科技 卞榮成

來源:京東雲開發者社群 轉載請註明來源