Java Websocket 02: 原生模式通過 Websocket 傳輸檔案

2023-06-19 12:01:25

目錄

Websocket 原生模式 傳輸檔案

關於 Websocket 傳輸的訊息型別, 允許的引數包括以下三類

  1. 以下型別之一, 同時只能出現一個
    • 文字型別 (text messages) 的訊息: String, Java primitive, 阻塞的 Stream Reader, 帶text decoder(Decoder.Text or Decoder.TextStream)的物件
    • 二進位制型別 (binary messages) 的訊息: byte[] 或 ByteBuffer, 阻塞的 InputStream, 帶 binary decoder (Decoder.Binary or Decoder.BinaryStream)的物件
    • Pong messages: PongMessage
  2. 通過 PathParam 指定的0個或多個基礎型別
  3. 對談引數 Session, 可選

因此對於不同的訊息型別, 可以有不同引數型別的 onMessage() 方法, 分別用於處理不同格式的內容, 對於傳輸檔案, 需要使用 ByteBuffer 型別的引數

void onMessage(ByteBuffer byteBuffer, Session session)

在處理過程中和普通的檔案傳輸是一樣的, 需要將檔案分片傳輸, 並約定合適的訊息頭用於判斷檔案傳輸的階段, 在伺服器端根據不同的階段進行檔案建立, 寫入和結束.

演示專案

與前一篇專案結構相同, 只需要修改 SocketServer 和 SocketClient

完整範例程式碼: https://github.com/MiltonLai/websocket-demos/tree/main/ws-demo02

SocketServer.java

增加了 onMessage(ByteBuffer byteBuffer, Session session) 方法用於處理二進位制訊息, 在方法中

  1. 先讀取第一個位元組的值, 根據不同的值對應不同的操作
    • 1 表示檔案傳輸前的準備
    • 3 表示檔案內容寫入
    • 5 表示檔案結束
  2. 再讀取後續的值
    • 1 解析出檔案元資訊, 並建立檔案通道
    • 3 將內容寫入檔案
    • 5 關閉檔案通道, 清除buffer
  3. 回傳ACK
    • 1 ACK 2
    • 3 不ACK
    • 5 ACK 6
@Component
@ServerEndpoint("/websocket/server/{sessionId}")
public class SocketServer {

    //...

    @OnMessage
    public void onMessage(ByteBuffer byteBuffer, Session session) throws IOException {
        if (byteBuffer.limit() == 0) {
            return;
        }

        byte mark = byteBuffer.get(0);
        if (mark == 1) {
            log.info("mark 1");
            byteBuffer.get();
            String info = new String(
                    byteBuffer.array(),
                    byteBuffer.position(),
                    byteBuffer.limit() - byteBuffer.position());
            FileInfo fileInfo = new JsonMapper().readValue(info, FileInfo.class);
            byteChannel = Files.newByteChannel(
                    Path.of("D:/data/" + fileInfo.getFileName()),
                    new StandardOpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE});
            //ack
            ByteBuffer buffer = ByteBuffer.allocate(4096);
            buffer.put((byte) 2);
            buffer.put("receive fileinfo".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            session.getBasicRemote().sendBinary(buffer);
        } else if (mark == 3) {
            log.info("mark 3");
            byteBuffer.get();
            byteChannel.write(byteBuffer);
        } else if (mark == 5) {
            log.info("mark 5");
            //ack
            ByteBuffer buffer = ByteBuffer.allocate(4096);
            buffer.clear();
            buffer.put((byte) 6);
            buffer.put("receive end".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            session.getBasicRemote().sendBinary(buffer);
            byteChannel.close();
            byteChannel = null;
        }
    }

    //...

    public static class FileInfo implements Serializable {
        private String fileName;
        private long fileSize;

        public String getFileName() {return fileName;}
        public void setFileName(String fileName) {this.fileName = fileName;}
        public long getFileSize() {return fileSize;}
        public void setFileSize(long fileSize) {this.fileSize = fileSize;}
    }
}

SocketClient.java

client 測試類, 連線後可以在命令列向 server 傳送訊息

首先是訊息處理中增加了 void onMessage(ByteBuffer bytes), 這個是用來接收伺服器端回傳的ACK的, 根據第一個位元組, 判斷伺服器端的處理結果. 這裡使用了一個 condition.notify() 用來通知傳送執行緒繼續傳送

其次是訊息傳送中, 用輸入的1觸發檔案傳送. 檔案傳送在 void sendFile(WebSocketClient webSocketClient, Object condition) 方法中進行, 通過一個 condition 物件, 在檔案開始傳輸和結束傳輸時控制執行緒的暫停和繼續. byteBuffer.flip()用於控制 byteBuffer 從狀態變為狀態, 用於傳送. flip is used to flip the ByteBuffer from "reading from I/O" (putting) to "writing to I/O" (getting).

public class SocketClient {

    private static final Logger log = LoggerFactory.getLogger(SocketClient.class);

    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {

        Object condition = new Object();

        WebSocketClient wsClient = new WebSocketClient(new URI("ws://127.0.0.1:8763/websocket/server/10001")) {

            //...

            @Override
            public void onMessage(ByteBuffer bytes) {
                //To overwrite
                byte mark = bytes.get(0);
                if (mark == 2) {
                    synchronized (condition) {
                        condition.notify();
                    }
                    log.info("receive ack for file info");
                } else if (mark == 6){
                    synchronized (condition) {
                        condition.notify();
                    }
                    log.info("receive ack for file end");
                }
            }

            @Override
            public void onClose(int i, String s, boolean b) {
                log.info("On close: {}, {}, {}", i, s, b);
            }

            @Override
            public void onError(Exception e) {
                log.error("On error: {}", e.getMessage());
            }
        };

        wsClient.connect();

        log.info("Connecting ...");
        while (!ReadyState.OPEN.equals(wsClient.getReadyState())) {

        }
        log.info("Connected");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String line = scanner.next();
            if ("1".equals(line))
                sendFile(wsClient, condition);
            else
                wsClient.send(line);
        }
    }

    public static void sendFile(WebSocketClient webSocketClient, Object condition){
        new Thread(() -> {
            try {
                SeekableByteChannel byteChannel = Files.newByteChannel(
                        Path.of("/home/milton/Backup/linux/apache-tomcat-8.5.58.tar.gz"),
                        new StandardOpenOption[]{StandardOpenOption.READ});

                ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);

                byteBuffer.put((byte)1);
                String info = "{\"fileName\": \"greproto.tar.gz\", \"fileSize\":"+byteChannel.size()+"}";
                byteBuffer.put(info.getBytes(StandardCharsets.UTF_8));
                byteBuffer.flip();
                webSocketClient.send(byteBuffer);
                synchronized (condition) {
                    condition.wait();
                }

                byteBuffer.clear();
                byteBuffer.put((byte)3);
                while (byteChannel.read(byteBuffer) > 0) {
                    byteBuffer.flip();
                    webSocketClient.send(byteBuffer);
                    byteBuffer.clear();
                    byteBuffer.put((byte)3);
                }

                byteBuffer.clear();
                byteBuffer.put((byte)5);
                byteBuffer.put("end".getBytes(StandardCharsets.UTF_8));
                byteBuffer.flip();
                webSocketClient.send(byteBuffer);
                synchronized (condition) {
                    condition.wait();
                }
                byteChannel.close();

            } catch (InterruptedException|IOException e) {
                log.error(e.getMessage(), e);
            }

        }).start();
    }
}

執行範例

範例是一個普通的 Spring Boot jar專案, 可以通過mvn clean package進行編譯, 再通過java -jar ws-demo01.jar執行, 啟動後工作在8763埠

將 SocketClient.java 中的檔案路徑 D:/WorkJava/tmp/greproto.tar.gz 換成自己原生的檔案路徑, 執行 SocketClient, 可以觀察到伺服器端接收到的訊息. 如果輸入1並回車, 就會觸發使用者端往伺服器端傳輸檔案

參考