Websocket叢集解決方案

2022-11-09 21:00:16

最近在專案中在做一個訊息推播的功能,比如客戶下單之後通知給給對應的客戶傳送系統通知,這種訊息推播需要使用到全雙工的websocket推播訊息。

所謂的全雙工表示使用者端和伺服器端都能向對方傳送訊息。不使用同樣是全雙工的http是因為http只能由使用者端主動發起請求,服務接收後返回訊息。websocket建立起連線之後,使用者端和伺服器端都能主動向對方傳送訊息。

上一篇文章Spring Boot 整合單機websocket介紹了websocket在單機模式下進行訊息的傳送和接收:

使用者A使用者Bweb伺服器建立連線之後,使用者A傳送一條訊息到伺服器,伺服器再推播給使用者B,在單機系統上所有的使用者都和同一個伺服器建立連線,所有的session都儲存在同一個伺服器中。

單個伺服器是無法支撐幾萬人同時連線同一個伺服器,需要使用到分散式或者叢集將請求連線負載均衡到到不同的服務下。訊息的傳送方和接收方在同一個伺服器,這就和單體伺服器類似,能成功接收到訊息:

但負載均衡使用輪詢的演演算法,無法保證訊息傳送方和接收方處於同一個伺服器,當傳送方和接收方不是在同一個伺服器時,接收方是無法接受到訊息的:

websocket叢集問題解決思路

使用者端和伺服器端每次建立連線時候,會建立有狀態的對談session,伺服器的儲存維持連線的session。使用者端每次只能和叢集伺服器其中的一個伺服器連線,後續也是和該伺服器進行資料傳輸。

要解決叢集的問題,應該考慮session共用的問題,使用者端成功連線伺服器之後,其他伺服器也知道使用者端連線成功。

方案一:session 共用(不可行)

websocket類似的http是如何解決叢集問題的?解決方案之一就是共用session,使用者端登入伺服器端之後,將session資訊儲存在Redis資料庫中,連線其他伺服器時,從Redis獲取session,實際就是將session資訊儲存在Redis中,實現redis的共用。

session可以被共用的前提是可以被序列化,而websocketsession是無法被序列化的,httpsession記錄的是請求的資料,而websocketsession對應的是連線,連線到不同的伺服器,session也不同,無法被序列化。

方案二:ip hash(不可行)

http不使用session共用,就可以使用Nginx負載均衡的ip hash演演算法,使用者端每次都是請求同一個伺服器,使用者端的session都儲存在伺服器上,而後續請求都是請求該伺服器,都能獲取到session,就不存在分散式session問題了。

websocket相對http來說,可以由伺服器端主動推動訊息給使用者端,如果接收訊息的伺服器端和傳送訊息訊息的伺服器端不是同一個伺服器端,傳送訊息的伺服器端無法找到接收訊息對應的session,即兩個session不處於同一個伺服器端,也就無法推播訊息。如下圖所示:

解決問題的方法是將所有訊息的傳送方和接收方都處於同一個伺服器下,而訊息傳送方和接收方都是不確定的,顯然是無法實現的。

方案三:廣播模式

將訊息的傳送方和接收方都處於同一個伺服器下才能傳送訊息,那麼可以轉換一下思路,可以將訊息以訊息廣播的方式通知給所有的伺服器,可以使用訊息中介軟體釋出訂閱模式,訊息脫離了伺服器的限制,通過傳送到中介軟體,再傳送給訂閱的伺服器,類似廣播一樣,只要訂閱了訊息,都能接收到訊息的通知:

釋出者釋出訊息到訊息中介軟體,訊息中介軟體再將傳送給所有訂閱者:

廣播模式的實現

搭建單機 websocket

參考以前寫的websocket單機搭建 文章,先搭建單機websocket實現訊息的推播。

1. 新增依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 建立 ServerEndpointExporter 的 bean 範例

ServerEndpointExporter 的 bean 範例自動註冊 @ServerEndpoint 註解宣告的 websocket endpoint,使用springboot自帶tomcat啟動需要該設定,使用獨立 tomcat 則不需要該設定。

@Configuration
public class WebSocketConfig {
    //tomcat啟動無需該設定
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3. 建立伺服器端點 ServerEndpoint 和 使用者端端

  • 伺服器端點
@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {

	private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();

	private Session session;

	@OnOpen
	public void onOpen(Session session) throws SocketException {
		this.session = session;
		webSocketSet.put(this.session.getId(),this);

		log.info("【websocket】有新的連線,總數:{}",webSocketSet.size());
	}

	@OnClose
	public void onClose(){
		String id = this.session.getId();
		if (id != null){
			webSocketSet.remove(id);
			log.info("【websocket】連線斷開:總數:{}",webSocketSet.size());
		}
	}

	@OnMessage
	public void onMessage(String message){
		if (!message.equals("ping")){
			log.info("【wesocket】收到使用者端傳送的訊息,message={}",message);
			sendMessage(message);
		}
	}

	/**
	 * 傳送訊息
	 * @param message
	 * @return
	 */
	public void sendMessage(String message){
		for (WebSocket webSocket : webSocketSet.values()) {
			webSocket.session.getAsyncRemote().sendText(message);
		}
		log.info("【wesocket】傳送訊息,message={}", message);

	}

}
  • 使用者端點
<div>
    <input type="text" name="message" id="message">
    <button id="sendBtn">傳送</button>
</div>
<div style="width:100px;height: 500px;" id="content">
</div>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script>
<script type="text/javascript">
    var ws = new WebSocket("ws://127.0.0.1:8080/message");
    ws.onopen = function(evt) {
        console.log("Connection open ...");
    };

    ws.onmessage = function(evt) {
        console.log( "Received Message: " + evt.data);
        var p = $("<p>"+evt.data+"</p>")
        $("#content").prepend(p);
        $("#message").val("");
    };

    ws.onclose = function(evt) {
        console.log("Connection closed.");
    };

    $("#sendBtn").click(function(){
        var aa = $("#message").val();
        ws.send(aa);
    })

</script>

伺服器端和使用者端中的OnOpenoncloseonmessage都是一一對應的。

  • 服務啟動後,使用者端ws.onopen呼叫伺服器端的@OnOpen註解的方法,儲存使用者端的session資訊,握手建立連線。
  • 使用者端呼叫ws.send傳送訊息,對應伺服器端的@OnMessage註解下面的方法接收訊息。
  • 伺服器端呼叫session.getAsyncRemote().sendText傳送訊息,對應的使用者端ws.onmessage接收訊息。

新增 controller

@GetMapping({"","index.html"})
public ModelAndView index() {
	ModelAndView view = new ModelAndView("index");
	return view;
}

效果展示

開啟兩個使用者端,其中的一個使用者端傳送訊息,另一個使用者端也能接收到訊息。

新增 RabbitMQ 中介軟體

這裡使用比較常用的RabbitMQ作為訊息中介軟體,而RabbitMQ支援釋出訂閱模式

新增訊息訂閱

交換機使用扇形交換機,訊息分發給每一條繫結該交換機的佇列。以伺服器所在的IP + 埠作為唯一標識作為佇列的命名,啟動一個服務,使用佇列繫結交換機,實現訊息的訂閱:

@Configuration
public class RabbitConfig {

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
    }

    @Bean
    public Queue psQueue() throws SocketException {
        // ip + 埠 為佇列名 
        String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
        return new Queue("ps_" + ip);
    }

    @Bean
    public Binding routingFirstBinding() throws SocketException {
        return BindingBuilder.bind(psQueue()).to(fanoutExchange());
    }
}

獲取伺服器IP和埠可以具體檢視Github原始碼,這裡就不做詳細描述了。

修改伺服器端點 ServerEndpoint

WebSocket新增訊息的接收方法,@RabbitListener 接收訊息,佇列名稱使用常數命名,動態佇列名稱使用 #{name},其中的nameQueuebean 名稱:

@RabbitListener(queues= "#{psQueue.name}")
public void pubsubQueueFirst(String message) {
  System.out.println(message);
  sendMessage(message);
}

然後再呼叫sendMessage方法傳送給所在連線的使用者端。

修改訊息傳送

WebSocket類的onMessage方法將訊息傳送改成RabbitMQ方式傳送:

@OnMessage
public void onMessage(String message){
  if (!message.equals("ping")){
    log.info("【wesocket】收到使用者端傳送的訊息,message={}",message);
    //sendMessage(message);
    if (rabbitTemplate == null) {
      rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate");
    }
    rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message);
  }
}

訊息通知流程如下所示:

啟動兩個範例,模擬叢集環境

開啟idea的Edit Configurations

點選左上角的COPY,然後新增埠server.port=8081

啟動兩個服務,埠分別是80808081。在啟動8081埠的服務,將前端連線埠改成8081:

var ws = new WebSocket("ws://127.0.0.1:8081/message");

效果展示

原始碼

github原始碼

參考