最近在專案中在做一個訊息推播的功能,比如客戶下單之後通知給給對應的客戶傳送系統通知,這種訊息推播需要使用到全雙工的websocket
推播訊息。
所謂的全雙工表示使用者端和伺服器端都能向對方傳送訊息。不使用同樣是全雙工的
http
是因為http
只能由使用者端主動發起請求,服務接收後返回訊息。websocket
建立起連線之後,使用者端和伺服器端都能主動向對方傳送訊息。
上一篇文章Spring Boot 整合單機websocket介紹了websocket
在單機模式下進行訊息的傳送和接收:
使用者A
和使用者B
和web
伺服器建立連線之後,使用者A
傳送一條訊息到伺服器,伺服器再推播給使用者B
,在單機系統上所有的使用者都和同一個伺服器建立連線,所有的session
都儲存在同一個伺服器中。
單個伺服器是無法支撐幾萬人同時連線同一個伺服器,需要使用到分散式或者叢集將請求連線負載均衡到到不同的服務下。訊息的傳送方和接收方在同一個伺服器,這就和單體伺服器類似,能成功接收到訊息:
但負載均衡使用輪詢的演演算法,無法保證訊息傳送方和接收方處於同一個伺服器,當傳送方和接收方不是在同一個伺服器時,接收方是無法接受到訊息的:
使用者端和伺服器端每次建立連線時候,會建立有狀態的對談session
,伺服器的儲存維持連線的session
。使用者端每次只能和叢集伺服器其中的一個伺服器連線,後續也是和該伺服器進行資料傳輸。
要解決叢集的問題,應該考慮session共用
的問題,使用者端成功連線伺服器之後,其他伺服器也知道使用者端連線成功。
和websocket
類似的http
是如何解決叢集問題的?解決方案之一就是共用session
,使用者端登入伺服器端之後,將session
資訊儲存在Redis
資料庫中,連線其他伺服器時,從Redis
獲取session
,實際就是將session
資訊儲存在Redis
中,實現redis的共用。
session
可以被共用的前提是可以被序列化,而websocket
的session
是無法被序列化的,http
的session
記錄的是請求的資料,而websocket
的session
對應的是連線,連線到不同的伺服器,session
也不同,無法被序列化。
http
不使用session
共用,就可以使用Nginx
負載均衡的ip hash
演演算法,使用者端每次都是請求同一個伺服器,使用者端的session
都儲存在伺服器上,而後續請求都是請求該伺服器,都能獲取到session
,就不存在分散式session
問題了。
websocket
相對http
來說,可以由伺服器端主動推動訊息給使用者端,如果接收訊息的伺服器端和傳送訊息訊息的伺服器端不是同一個伺服器端,傳送訊息的伺服器端無法找到接收訊息對應的session
,即兩個session不處於同一個伺服器端,也就無法推播訊息。如下圖所示:
解決問題的方法是將所有訊息的傳送方和接收方都處於同一個伺服器下,而訊息傳送方和接收方都是不確定的,顯然是無法實現的。
將訊息的傳送方和接收方都處於同一個伺服器下才能傳送訊息,那麼可以轉換一下思路,可以將訊息以訊息廣播的方式通知給所有的伺服器,可以使用訊息中介軟體釋出訂閱模式,訊息脫離了伺服器的限制,通過傳送到中介軟體,再傳送給訂閱的伺服器,類似廣播一樣,只要訂閱了訊息,都能接收到訊息的通知:
釋出者釋出訊息到訊息中介軟體,訊息中介軟體再將傳送給所有訂閱者:
參考以前寫的websocket單機搭建 文章,先搭建單機websocket
實現訊息的推播。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
ServerEndpointExporter 的 bean 範例自動註冊 @ServerEndpoint 註解宣告的 websocket endpoint,使用springboot自帶tomcat啟動需要該設定,使用獨立 tomcat 則不需要該設定。
@Configuration
public class WebSocketConfig {
//tomcat啟動無需該設定
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {
private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();
private Session session;
@OnOpen
public void onOpen(Session session) throws SocketException {
this.session = session;
webSocketSet.put(this.session.getId(),this);
log.info("【websocket】有新的連線,總數:{}",webSocketSet.size());
}
@OnClose
public void onClose(){
String id = this.session.getId();
if (id != null){
webSocketSet.remove(id);
log.info("【websocket】連線斷開:總數:{}",webSocketSet.size());
}
}
@OnMessage
public void onMessage(String message){
if (!message.equals("ping")){
log.info("【wesocket】收到使用者端傳送的訊息,message={}",message);
sendMessage(message);
}
}
/**
* 傳送訊息
* @param message
* @return
*/
public void sendMessage(String message){
for (WebSocket webSocket : webSocketSet.values()) {
webSocket.session.getAsyncRemote().sendText(message);
}
log.info("【wesocket】傳送訊息,message={}", message);
}
}
<div>
<input type="text" name="message" id="message">
<button id="sendBtn">傳送</button>
</div>
<div style="width:100px;height: 500px;" id="content">
</div>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script>
<script type="text/javascript">
var ws = new WebSocket("ws://127.0.0.1:8080/message");
ws.onopen = function(evt) {
console.log("Connection open ...");
};
ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
var p = $("<p>"+evt.data+"</p>")
$("#content").prepend(p);
$("#message").val("");
};
ws.onclose = function(evt) {
console.log("Connection closed.");
};
$("#sendBtn").click(function(){
var aa = $("#message").val();
ws.send(aa);
})
</script>
伺服器端和使用者端中的OnOpen
、onclose
、onmessage
都是一一對應的。
ws.onopen
呼叫伺服器端的@OnOpen
註解的方法,儲存使用者端的session資訊,握手建立連線。ws.send
傳送訊息,對應伺服器端的@OnMessage
註解下面的方法接收訊息。session.getAsyncRemote().sendText
傳送訊息,對應的使用者端ws.onmessage
接收訊息。@GetMapping({"","index.html"})
public ModelAndView index() {
ModelAndView view = new ModelAndView("index");
return view;
}
開啟兩個使用者端,其中的一個使用者端傳送訊息,另一個使用者端也能接收到訊息。
這裡使用比較常用的RabbitMQ
作為訊息中介軟體,而RabbitMQ
支援釋出訂閱模式:
交換機使用扇形交換機,訊息分發給每一條繫結該交換機的佇列。以伺服器所在的IP + 埠作為唯一標識作為佇列的命名,啟動一個服務,使用佇列繫結交換機,實現訊息的訂閱:
@Configuration
public class RabbitConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
}
@Bean
public Queue psQueue() throws SocketException {
// ip + 埠 為佇列名
String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
return new Queue("ps_" + ip);
}
@Bean
public Binding routingFirstBinding() throws SocketException {
return BindingBuilder.bind(psQueue()).to(fanoutExchange());
}
}
獲取伺服器IP和埠可以具體檢視Github原始碼,這裡就不做詳細描述了。
在WebSocket
新增訊息的接收方法,@RabbitListener
接收訊息,佇列名稱使用常數命名,動態佇列名稱使用 #{name}
,其中的name
是Queue
的bean
名稱:
@RabbitListener(queues= "#{psQueue.name}")
public void pubsubQueueFirst(String message) {
System.out.println(message);
sendMessage(message);
}
然後再呼叫sendMessage
方法傳送給所在連線的使用者端。
在WebSocket
類的onMessage
方法將訊息傳送改成RabbitMQ
方式傳送:
@OnMessage
public void onMessage(String message){
if (!message.equals("ping")){
log.info("【wesocket】收到使用者端傳送的訊息,message={}",message);
//sendMessage(message);
if (rabbitTemplate == null) {
rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate");
}
rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message);
}
}
訊息通知流程如下所示:
開啟idea的Edit Configurations
:
點選左上角的COPY,然後新增埠server.port=8081
:
啟動兩個服務,埠分別是8080
和8081
。在啟動8081
埠的服務,將前端連線埠改成8081
:
var ws = new WebSocket("ws://127.0.0.1:8081/message");