SpringBoot整合Websocket,實現作為使用者端接收訊息的同時作為伺服器端向下遊客戶傳送訊息

2023-07-21 18:06:08

SpringBoot整合Websocket

1. SpringBoot作為伺服器端

作為伺服器端時,需要先匯入websocket的依賴

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

建立WebSocketServer工具類

package com.newlinker.jiangyin.utils;

/**
 * @author cyl
 * @time 2023/7/21
 */
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
@ServerEndpoint(value = "/websocket")
public class WebSocketServer {

    // 使用者端對談列表
    private static final Map<String, Session> clientSessions = new ConcurrentHashMap<>();

    @OnOpen
    public void onServerOpen(Session session) {
        // 使用者端連線到本地 WebSocket 服務
        System.out.println("Client connected: " + session.getId());
        clientSessions.put(session.getId(), session);
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        // 處理使用者端傳送的訊息
        System.out.println("Received message from client " + session.getId() + ": " + message);

        // 範例:將收到的訊息廣播給所有使用者端
        //broadcast(message);
    }

    @OnClose
    public void onServerClose(Session session, CloseReason reason) {
        // 使用者端斷開連線
        System.out.println("Client " + session.getId() + " disconnected: " + reason);
        clientSessions.remove(session.getId());
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        // 使用者端連線發生錯誤
        System.out.println("WebSocket client error: " + throwable.getMessage());
        clientSessions.remove(session.getId());
    }

    // 傳送訊息給指定使用者端
    public void sendToClient(String clientId, String message) {
        Session session = clientSessions.get(clientId);
        if (session != null && session.isOpen()) {
            session.getAsyncRemote().sendText(message);
        }
    }

    // 廣播訊息給所有使用者端
    public void broadcast(String message) {
        for (Session session : clientSessions.values()) {
            if (session.isOpen()) {
                session.getAsyncRemote().sendText(message);
            }
        }
    }

    // 關閉使用者端連線
    public void closeClientConnection(String clientId) {
        Session session = clientSessions.get(clientId);
        if (session != null && session.isOpen()) {
            try {
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Closing connection"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

新增Spring Bean設定

package com.newlinker.jiangyin.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author cyl
 * @time 2022/4/11
 */

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

至此,SpringBoot已可作為伺服器端進行websocket連線測試,測試時的路徑為:

ws://localhost:port/websocket

其中若SpringBoot設定了ssl證書可提供https存取,則應將websocket連線協定更改為wss

websocket路徑中的"/websocket"由@ServerEndpoint註解決定,推薦使用線上測試,簡單方便

2. SpringBoot作為使用者端

作為使用者端,推薦使用okhttp的依賴以及google的gson轉換包(可與上方的依賴共存,不用擔心)

<dependency>
	<groupId>com.squareup.okhttp3</groupId>
	<artifactId>okhttp</artifactId>
	<version>4.9.1</version>
</dependency>
<!-- 非必須,可以使用其他JSON包進行處理 -->
<dependency>
	<groupId>com.google.code.gson</groupId>
	<artifactId>gson</artifactId>
	<version>2.8.9</version>
</dependency>

建立WebSocketClient工具類

package com.newlinker.jiangyin.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.newlinker.jiangyin.config.XingHuoConfig;
import com.newlinker.jiangyin.entity.ro.Payload;
import com.newlinker.jiangyin.entity.ro.ResponseData;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.websocket.OnError;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @author cyl
 * @time 2022/4/11
 */
@Component
public class WebSocketClient extends WebSocketListener {
    //注入你的WebSocketServer工具類
    @Autowired
    private WebSocketServer webSocketServer;

    private WebSocket webSocket;

    // 使用者端連線其他伺服器
    public void connectToServer(String serverUrl) {
        OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
        Request request = new Request.Builder().url(serverUrl).build();
        webSocket = okHttpClient.newWebSocket(request, this);
    }


    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);

    }
	
    //收到訊息時觸發,核心邏輯
    @Override
    public void onMessage(WebSocket webSocket, String text) {
        ResponseData responseData = GSON.fromJson(text, ResponseData.class);
        //此處伺服器返回的status值為0時代表連線正常,由介面具體情況而定,與協定無關
        if (0 == responseData.getHeader().get("code").getAsInt()) {
                Payload pl =GSON.fromJson(responseData.getPayload(), Payload.class);
                JsonArray temp = (JsonArray) pl.getChoices().get("text");
                JsonObject jo = (JsonObject) temp.get(0); 
            //解析結果後將內容轉發給下游使用者端,也可以使用sendMessage方法定向傳送
                webSocketServer.broadcast(jo.get("content").getAsString());
            //如果不想每次傳送訊息時都主動連線,需要建立websocket心跳,這裡每次收發訊息都主動斷開
                webSocket.close(3, "使用者端主動斷開連結");
            }
        } else {
            System.out.println("返回結果錯誤:\n" + responseData.getHeader().get("code") + " " + responseData.getHeader().get("message"));
        }
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
        System.out.println("WebSocket連線失敗:");
        super.onFailure(webSocket, t, response);
        System.out.println(response);
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        System.out.println("WebSocket發生錯誤:" + throwable.getMessage());
    }
	
	//可以在Controller中呼叫該方法進行websocket的手動傳送以及引數調整
    public void sendMessage(String word) {
        connectToServer();
        JsonObject frame = new JsonObject();
        //根據自己的需求填充你的請求引數
        //...
        webSocket.send(frame.toString());
        System.out.println(frame.toString());
    }
}