最近在做一款輕量級IM產品,後端技術棧框架使用了nodejs + nestjs作為伺服器端。同時,還需要滿足一個服務同時支援HTTP服務呼叫以及WebSocket服務呼叫,此文主要記錄本次搭建過程,以及基本的伺服器端設計。
node v14.17.5
nestjs 全域性命令列工具(npm i -g @nestjs/cli
)
本文不再詳細介紹nestjs各種概念,請參考:First steps | NestJS - A progressive Node.js framework
直接建立一個Demo專案:
nest new nest-http-socket-demo
等待專案完成以後(這個過程可能會持續比較久,因為建立好目錄結構以後還會進行包安裝),結構如下:
nest-http-websocket-demo
├─ .eslintrc.js
├─ .gitignore
├─ .prettierrc
├─ README.md
├─ nest-cli.json
├─ node_modules
│ └─ ... ...
├─ package.json
├─ src
│ ├─ app.controller.spec.ts
│ ├─ app.controller.ts
│ ├─ app.module.ts
│ ├─ app.service.ts
│ └─ main.ts
├─ test
│ ├─ app.e2e-spec.ts
│ └─ jest-e2e.json
├─ tsconfig.build.json
├─ tsconfig.json
└─ yarn.lock
初始的目錄結構可能不太符合我們的期望,我們對目錄結構進行適當的調整。主要分為幾個目錄:
調整後的src目錄結構如下:
- src
├─ base
├─ common
├─ entity
└─ module
在規劃API之前,我們先設計定義一些伺服器端基本資料結構。
眾所周知,一般的伺服器端都會對原始返回資料進行一定的包裝,增加返回碼、錯誤訊息等來明確的指出具體的錯誤內容,在我們的服務也不例外。於是,我們設計如下的結構體:
export interface ServerResponseWrapper {
/**
* 伺服器端返回碼
*/
returnCode: string;
/**
* 錯誤資訊(如有,例如返回碼非成功碼)
*/
errorMessage?: string;
/**
* 返回資料(如有)
*/
data?: any;
}
對於該結構來說,後續使用者端也會使用相同的資料結構進行解析,所以我們可以考慮將該檔案放在src/common中。
下面是一些常見的返回資料(純樣例):
// 獲取使用者基本資訊成功
{
"returnCode": "SUC00000",
"data": {
"username": "w4ngzhen",
"lastLoginTime": "2022-11-22 11:50:22.000"
}
}
// 獲取使用者名稱稱出錯(沒有提供對應的userId)
{
"returnCode": "ERR40000",
"errorMessage": "user id is empty.",
}
// 獲取伺服器端時間
{
"returnCode": "SUC0000",
"data": "2022-11-22 11:22:33.000"
}
為了統一返回碼,我們在定義了一個ReturnCode實體類,由該類統一封裝返回碼。作為外部會涉及瞭解到的內容,我們也將該類放置於src/common中,且匯出常用的錯誤碼,程式碼如下:
export class ReturnCode {
private readonly _preCode: 'SUC' | 'ERR';
private readonly _subCode: string;
private readonly _statusCode: number;
get codeString(): string {
return `${this._preCode}${this._subCode}`;
}
get statusCode(): number {
return this._statusCode;
}
constructor(prefix: 'SUC' | 'ERR', subCode: string, statusCode: number) {
this._preCode = prefix;
this._subCode = subCode;
this._statusCode = statusCode;
}
}
export const SUCCESS = new ReturnCode('SUC', '00000', 200);
export const ERR_NOT_FOUND = new ReturnCode('ERR', '40400', 404);
為了便於在服務呼叫過程中,能夠按照具體的業務層面進行異常丟擲。我們定義一個名為BizException的類來封裝業務異常。對於外部系統來說,該異常並不可見,所以我們把該類放置於src/base中:
import {ReturnCode} from "../common/return-code";
export class BizException {
private readonly _errorCode: ReturnCode;
private readonly _errorMessage: string;
get errorCode(): ReturnCode {
return this._errorCode;
}
get errorMessage(): string {
return this._errorMessage;
}
protected constructor(errorEntity: ReturnCode, errorMessage: string) {
this._errorMessage = errorMessage;
this._errorCode = errorEntity;
}
static create(errEntity: ReturnCode, errMessage?: string): BizException {
return new BizException(errEntity, errMessage);
}
}
接下來,我們為伺服器規劃兩個API,分別體現HTTP服務和WebSocket服務。
首先,我們設計一個簡單使用者資訊查詢服務介面。該介面可以根據傳遞而來的使用者ID(userId)返回對應的使用者資訊:
GET /users?userId=${userId}
為了實現上述介面,我們按照如下流程進行API搭建:
// src/entity/user/user.dto.ts
export interface UserDto {
userId: string;
username: string;
age: number;
}
// src/module/user/user.service.ts
import {Injectable} from '@nestjs/common';
import {UserDto} from "../../entity/user/user.dto";
@Injectable()
export class UserService {
async getUserById(userId: string): Promise<UserDto> {
// 測試資料
const demoData: UserDto[] = [
{
userId: 'tom',
username: 'Tom',
age: 10
},
{
userId: 'jerry',
username: 'Jerry',
age: 11
}
];
return demoData.find(u => u.userId === userId);
}
}
user.controller.ts
),增加GET /users
介面,請求引數並呼叫服務:import {Controller, Get, Param, Query} from '@nestjs/common';
import {UserService} from './user.service';
import {UserDto} from "../../entity/user/user.dto";
@Controller("users")
export class UserController {
constructor(private readonly userService: UserService) {
}
@Get()
async getHello(@Query('userId') userId: string): Promise<UserDto> {
return this.userService.getUserById(userId);
}
}
src/module/user/user.module.ts
):import { Module } from '@nestjs/common';
import { UserController } from './user.controller';
import { UserService } from './user.service';
@Module({
imports: [],
controllers: [UserController],
providers: [UserService],
})
export class UserModule {}
import { AppService } from './app.service';
+import {UserModule} from "./module/user/user.module";
@Module({
- imports: [],
+ imports: [UserModule],
controllers: [AppController],
providers: [AppService],
})
完成上述操作以後,我們就可以啟動服務進行驗證了:
上面的介面返回可以看出,Controller返回是什麼樣的結構體,前端請求到的資料就是什麼結構,但我們希望將資料按照ServerResponseWrapper結構進行封裝。在nestjs中,可以通過實現來自@nestjs/common
中的NestInterceptor
介面來編寫我們自己的響應攔截,統一處理響應來實現前面的需求。按照我們之前規劃,我們首先在src/base中建立interceptor目錄,然後在裡面建立http-service.response.interceptor.ts
,內容如下:
// src/base/interceptor/http-service.response.interceptor.ts
import {CallHandler, ExecutionContext, NestInterceptor} from "@nestjs/common";
import {map, Observable} from "rxjs";
import {ServerResponseWrapper} from "../../common/server-response-wrapper";
import {SUCCESS} from "../../common/return-code";
/**
* 全域性Http服務響應攔截器
* 該Interceptor在main中通過
* app.useGlobalInterceptors 來全域性引入,
* 僅處理HTTP服務成功響應攔截,異常是不會進入該攔截器
*/
export class HttpServiceResponseInterceptor implements NestInterceptor {
intercept(context: ExecutionContext,
next: CallHandler):
Observable<any> | Promise<Observable<any>> {
return next.handle().pipe(map(data => {
// 進入該攔截器,說明沒有異常,使用成功返回
const resp: ServerResponseWrapper = {
returnCode: SUCCESS.codeString,
data: data
};
return resp;
}))
}
}
建立完成後,我們在main入口中,需要將該響應攔截器註冊到全域性中:
// src/main.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule);
+
+ // 增加HTTP服務的成功響應攔截器
+ app.useGlobalInterceptors(new HttpServiceResponseInterceptor());
+
await app.listen(3000);
}
bootstrap();
完成設定以後,我們可以再次呼叫API來檢視結果:
可以看到,儘管我們的Controller返回的是一個實際資料結構(Promise也適用),但是經過響應攔截器的處理,我們完成了對響應體的包裹封裝。
上述我們完成一個呼叫,並對響應成功的資料進行了包裹,但面對異常情況同樣適用嗎?如果不適用又需要如何處理呢?
首先,我們增加一個專門處理欄位錯誤的錯誤碼ReturnCode:
// src/common/return-code.ts
export const SUCCESS = new ReturnCode('SUC', '00000', 200);
+export const ERR_REQ_FIELD_ERROR = new ReturnCode('ERR', '40000', 400);
export const ERR_NOT_FOUND = new ReturnCode('ERR', '40400', 404);
然後,我們在UserService中適當修改一下getUserById的實現,加入userId判空判斷,並在為空的時候,丟擲業務異常(這個過程我們順便安裝了lodash):
+import * as _ from 'lodash';
+import {BizException} from "../../common/biz-exception";
+import {ERR_REQ_FIELD_ERROR} from "../../common/return-code";
@Injectable()
export class UserService {
async getUserById(userId: string): Promise<UserDto> {
+ if (_.isEmpty(userId)) {
+ throw BizException.create(ERR_REQ_FIELD_ERROR, 'user id is empty');
+ }
... ...
}
}
完成上述修改後,我們嘗試發請求時候,故意不填寫userId,得到如下的結果:
可以看到,儘管nestjs幫助我們進行一定的封裝,但是結構體與我們一開始定義的ServerResponseWrapper是不一致的。為了保持一致,我們需要接管nestjs的例外處理,並轉換為我們自己的wrapper結構,而接管的方式則是建立一個實現ExceptionFilter介面的類(按照路徑劃分,我們將這個類所在檔案http-service.exception.filter.ts
存放於src/base/filter目錄下):
import {ArgumentsHost, Catch, ExceptionFilter, HttpException} from "@nestjs/common";
import {ServerResponseWrapper} from "../../common/server-response-wrapper";
import {BizException} from "../../common/biz-exception";
/**
* 全域性Http服務的例外處理,
* 該Filter在main中通過
* app.useGlobalExceptionFilter來全域性引入,
* 僅處理HTTP服務
*/
@Catch()
export class HttpServiceExceptionFilter implements ExceptionFilter {
catch(exception: any, host: ArgumentsHost): any {
// 進入該攔截器,說明http呼叫中存在異常,需要解析異常,並返回統一處理
let responseWrapper: ServerResponseWrapper;
let httpStatusCode: number;
if (exception instanceof BizException) {
// 業務層Exception
responseWrapper = {
returnCode: exception.errorCode.codeString,
errorMessage: exception.errorMessage
}
httpStatusCode = exception.errorCode.statusCode;
} else if (exception instanceof HttpException) {
// 框架層的Http異常
responseWrapper = {
returnCode: 'IM9009',
errorMessage: exception.message,
}
httpStatusCode = exception.getStatus();
} else {
// 其他錯誤
responseWrapper = {
returnCode: 'IM9999',
errorMessage: 'server unknown error: ' + exception.message,
};
httpStatusCode = 500;
}
// 該攔截器處理HTTP服務的異常,所以手動切換到HTTP Host
// 並獲取響應response,進行HTTP響應的寫入
const httpHost = host.switchToHttp();
const response = httpHost.getResponse();
response.status(httpStatusCode).json(responseWrapper);
}
}
該類的核心點在於,對捕獲到的異常進行解析後,我們會通過引數ArgumentsHost來獲取實際的HTTP Host,並從中獲取response物件,呼叫相關支援的方法來控制響應response的內容(http狀態碼以及響應體內容)。
最後,我們依然在main裡面進行註冊設定:
+import {HttpServiceExceptionFilter} from "./base/filter/http-service.exception.filter";
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// 增加HTTP服務的成功響應攔截器
app.useGlobalInterceptors(new HttpServiceResponseInterceptor());
+ // 增加HTTP服務的異常過濾器,進行響應包裹
+ app.useGlobalFilters(new HttpServiceExceptionFilter());
await app.listen(3000);
}
完成開發設定以後,我們重啟服務,通過呼叫介面可以看到對應異常返回:
在nestjs中想要整合WebSocket服務也很容易。
首先,我們使用一個裝飾器@WebSocketGateway()
來表明一個類是一個WebSocket的閘道器(Gateway),這個裝飾器可以指定WebSocket服務的埠等資訊。通常情況下,我們可以設定與HTTP服務不一樣的埠,這樣我們就可以在一個臺服務上通過不同的埠暴露HTTP和WebSocket服務。當然,這不是必須,只是為了更好的區分服務。
其次,我們需要明白在nestjs可以使用ws或者socket.io兩種具體實現的websocket平臺。什麼是具體平臺?簡單來講,nestjs只負責設定一個標準的WebSocket閘道器規範,提供通用的API、介面、裝飾器等,各個平臺則是根據nestjs提供的規範進行實現。在本例中,我們選擇使用socket.io作為nestjs上WebSocket具體的實現,因為socket.io是一個比較著名websocket庫,同時支援伺服器端和使用者端,並且在使用者端/伺服器端均內建支援了"請求 - 響應"一來一回機制。
依賴安裝
nestjs中的websocket是一個獨立的模組,且我們選取了socket.io作為websocket的實現,所以我們需要首先安裝一下的基礎模組:
yarn add @nestjs/websockets @nestjs/platform-socket.io
閘道器建立
websocket的相關內容,我們同樣作為一種模組進行編寫。於是,我們在src/module/目錄中建立websocket資料夾,並在裡面建立一個檔案:my-websocket.gateway.ts,編寫WS閘道器MyWebSocketGateway類的內容:
import {WebSocketGateway} from "@nestjs/websockets";
@WebSocketGateway(4000, {
transports: ['websocket']
})
export class MyWebSocketGateway {
}
一個簡單的WebSocket閘道器就建立完成了。我們首先設定了WebSocket服務的埠號為4000(與HTTP服務的3000隔離開);其次,需要特別提一下transports引數,可選擇的transport有兩種:
polling(HTTP長連線輪詢)
該機制由連續的 HTTP 請求組成:
GET
POST
由於傳輸的性質,連續的發出可以在同一 HTTP 請求中連線和傳送。
也就是說,polling本質上是利用HTTP請求+輪詢來完成所謂的雙工通訊,在某些古老的沒有實現真正WebSocket協定的瀏覽器作為一種實現方案。
websocket(網路通訊端)
WebSocket 傳輸由WebSocket 連線組成,該連線在伺服器和使用者端之間提供雙向和低延遲的通訊通道。這是真正的長連線雙工通訊協定。
所以,在通訊的過程中,伺服器端與使用者端要保持相匹配的傳輸協定。
模組建立註冊
同樣的,我們在src/module/websocket中建立一個my-websocket.module.ts檔案,內容如下:
import {MyWebSocketGateway} from "./my-websocket.gateway";
import {Module} from "@nestjs/common";
@Module({
providers: [MyWebSocketGateway]
})
export class MyWebSocketModule {
}
主要內容是將MyWebSocketGateway註冊到模組中。
最後我們將MyWebSocket模組註冊到根模組中:
+import {MyWebSocketModule} from "./module/websocket/my-websocket.module";
@Module({
- imports: [UserModule],
+ imports: [UserModule, MyWebSocketModule],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
我們先設定這樣一個場景:使用者端連線上WebSocket服務後,可以給伺服器端傳送一份JSON資料(內容加下方),伺服器端校驗該資料後,在控制檯列印資料。
{
"name": "w4ngzhen"
}
對於伺服器端來說,我們首先需要訂閱事件(subscribe),假設傳送JSON資料的事件為hello
,那麼我們可以通過如下的方式來進行訂閱:
export class MyWebSocketGateway {
@SubscribeMessage('hello')
hello(@MessageBody() reqData: { name: string }) {
if (!reqData || !reqData.name) {
throw BizException.create(ERR_REQ_FIELD_ERROR, 'data is empty');
}
console.log(JSON.stringify(reqData));
}
}
測試WebSocket,可以使用postman來進行,只需要建立個一WebSocket的請求,在postman中按下CTRL+N(macOS為command+N),可以選擇WebSocket請求:
建立後,需要注意,由於我們nestjs整合的WebSocket實現使用的socket.io,所以使用者端需要匹配對應的實現(這點主要是為了匹配」請求-響應「一來一回機制)
完成設定後,我們可以採用如下的步驟進行事件傳送:
傳送完成後,就會看到postman的列印和nodejs服務控制檯的列印,符合我們的預期:
當然,我前面提到過socket.io支援事件一來一回的請求響應模式。在nestjs中的WebSocket閘道器,只需要在對應的請求返回值即可:
@SubscribeMessage('hello')
hello(@MessageBody() reqData: { name: string }) {
if (!reqData || !reqData.name) {
throw BizException.create(ERR_REQ_FIELD_ERROR, 'data is empty');
}
console.log(JSON.stringify(reqData));
+ return 'received reqData';
}
在postman的地方,我們需要傳送的時候勾選上Acknowledgement
:
完成以後,我們重新連線服務並行送資料,就可以看到一條完整的事件處理鏈路了:
至此,我們就完成了在Nestjs整合一個基礎的WebSocket服務了。
當然,我們的工作還沒有結束。在前面我們對HTTP服務編寫了成功響應攔截器以及異常過濾器,接下來,我們按照同樣的方式編寫WebSocket的相關處理。
對於整合在nestjs中的WebSocket服務,想要編寫並設定一個成功響應攔截器並不複雜,沒有什麼坑。
首先,我們仿照著http-service.response.interceptor.ts,編寫一個幾乎完全一樣的ws-service.response.interceptor.ts,與HTTP的成功響應攔截器放在相同目錄src/base/interceptor中:
// src/base/interceptor/ws-service.response.interceptor.ts
import {CallHandler, ExecutionContext, NestInterceptor} from "@nestjs/common";
import {map, Observable} from "rxjs";
import {ServerResponseWrapper} from "../../common/server-response-wrapper";
import {SUCCESS} from "../../common/return-code";
/**
* 全域性WebSocket服務響應攔截器
* 該Interceptor在閘道器中通過裝飾器 @UseInterceptors 使用
* 僅處理WebSocket服務成功響應攔截,異常是不會進入該攔截器
*/
export class WsServiceResponseInterceptor implements NestInterceptor {
intercept(context: ExecutionContext,
next: CallHandler):
Observable<any> | Promise<Observable<any>> {
return next.handle().pipe(map(data => {
// 進入該攔截器,說明沒有異常,使用成功返回
const resp: ServerResponseWrapper = {
returnCode: SUCCESS.codeString,
data: data
};
return resp;
}))
}
}
其次,與HTTP註冊攔截器不同的是,nestjs中註冊WebSocket的攔截器,需要在閘道器類上使用裝飾器進行:
+ // 安裝WebSocket成功響應攔截器
+ @UseInterceptors(new WsServiceResponseInterceptor())
@WebSocketGateway(4000, {
transports: ['websocket']
})
export class MyWebSocketGateway {
... ...
設定完成以後,我們重啟服務,再次使用postman進行WebSocket事件請求,則會看到經過包裝後的響應體:
當然,我們嘗試不傳送任何的資料。理論上,則會進入校驗流程不通過的場景,丟擲BizException。在實際的傳送中,我們會看到,postman無法接受到異常:
在伺服器端會看到一個異常報錯:
對於這個問題,我們的需求是無論是否有異常,都需要使用ServerResponseWrapper進行包裹。與HTTP不同的是,WebSocket的異常過濾器需要實現WsExceptionFilter
介面,實現該介面的catch方法:
import {ArgumentsHost, Catch, ExceptionFilter, HttpException, WsExceptionFilter} from "@nestjs/common";
import {ServerResponseWrapper} from "../../common/server-response-wrapper";
import {BizException} from "../../common/biz-exception";
/**
* 全域性WebSocket服務的例外處理,
* 該Filter在閘道器中通過 使用 @UseFilters 來進行註冊
* 僅處理WebSocket閘道器服務
*/
@Catch()
export class WsServiceExceptionFilter implements WsExceptionFilter {
catch(exception: any, host: ArgumentsHost): any {
// 進入該攔截器,說明http呼叫中存在異常,需要解析異常,並返回統一處理
let responseWrapper: ServerResponseWrapper;
if (exception instanceof BizException) {
// 業務層Exception
responseWrapper = {
returnCode: exception.errorCode.codeString,
errorMessage: exception.errorMessage
}
} else {
// 其他錯誤
responseWrapper = {
returnCode: 'IM9999',
errorMessage: 'server unknown error: ' + exception.message,
};
}
// 對異常進行封裝以後,需要讓框架繼續進行呼叫處理,才能正確的響應給使用者端
// 此時,需要提取到callback這個函數
// 參考:https://stackoverflow.com/questions/61795299/nestjs-return-ack-in-exception-filter
const callback = host.getArgByIndex(2);
if (callback && typeof callback === 'function') {
callback(responseWrapper);
}
}
}
這個Filter與HTTP服務中的異常過濾器差異點主要三點:
1)WebSocket中不存在HTTP狀態碼且不存在HTTP異常,所以我們只需要解析區分BizException與非BizException。
2)WebSocket的異常過濾器中,想要繼續後的資料處理,需要在方法返回前,從host中取到第三個引數物件(索引值為2),該值是一個回撥函數,將處理後的資料作為引數,呼叫該callback方法,框架才能繼續處理。—— WebSocket異常過濾器最終返回的關鍵點。
// 對異常進行封裝以後,需要讓框架繼續進行呼叫處理,才能正確的響應給使用者端
// 此時,需要提取到callback這個函數
// 參考:https://stackoverflow.com/questions/61795299/nestjs-return-ack-in-exception-filter
const callback = host.getArgByIndex(2);
if (callback && typeof callback === 'function') {
callback(responseWrapper);
}
3)註冊該異常過濾器同樣和WebSocket的響應攔截器一樣,需要在閘道器類上使用@UseFilters
裝飾器。
// 安裝WebSocket成功響應攔截器
@UseInterceptors(new WsServiceResponseInterceptor())
+ // 安裝WebSocket異常過濾器
+ @UseFilters(new WsServiceExceptionFilter())
@WebSocketGateway(4000, {
transports: ['websocket']
})
完成該設定後,我們再次重啟服務,使用postman,可以看到wrapper包裝後的效果:
本次demo已經提交至github
w4ngzhen/nest-http-websocket-demo (github.com)
同時,按照每一階段進行了適配提交:
add: 新增WebSocket異常過濾器並註冊到WebSocket閘道器中。
add: 新增WebSocket成功響應攔截器並註冊到WebSocket閘道器中。
modify: 新增WebSocket的事件響應資料。
modify: 增減對事件」hello「的處理,並在控制檯列印請求。
add: 建立一個基本的WebSocket閘道器以及將閘道器模組進行註冊。
add: 增加nestjs websocket依賴、socket.io平臺實現。
add: 新增HTTP服務異常過濾器,對異常進行解析並返回Wrapper包裹資料。
modify: 修改獲取使用者資訊邏輯,加入userId判空檢查。
add: 新增HTTP服務成功響應攔截器,對返回體進行統一Wrapper包裹。
modify: 註冊user模組到app主模組。
add: 新增使用者User模組相關的dto定義、service、controller以及module。
add: 新增ServerResponseWrapper作為伺服器端響應資料封裝;新增返回碼類,統一定義返回碼;新增業務異常類,封裝業務異常。
init: 初始化專案結構
我會逐步完善這個demo,接入各種常用的模組(資料庫、Redis、S3-ECS等)。本文是本demo的初始階段,已經發佈於1.0版本tag。