關於 Websocket 傳輸的訊息型別, 允許的引數包括以下三類
因此對於不同的訊息型別, 可以有不同引數型別的 onMessage() 方法, 分別用於處理不同格式的內容, 對於傳輸檔案, 需要使用 ByteBuffer 型別的引數
void onMessage(ByteBuffer byteBuffer, Session session)
在處理過程中和普通的檔案傳輸是一樣的, 需要將檔案分片傳輸, 並約定合適的訊息頭用於判斷檔案傳輸的階段, 在伺服器端根據不同的階段進行檔案建立, 寫入和結束.
與前一篇專案結構相同, 只需要修改 SocketServer 和 SocketClient
完整範例程式碼: https://github.com/MiltonLai/websocket-demos/tree/main/ws-demo02
增加了 onMessage(ByteBuffer byteBuffer, Session session)
方法用於處理二進位制訊息, 在方法中
@Component
@ServerEndpoint("/websocket/server/{sessionId}")
public class SocketServer {
//...
@OnMessage
public void onMessage(ByteBuffer byteBuffer, Session session) throws IOException {
if (byteBuffer.limit() == 0) {
return;
}
byte mark = byteBuffer.get(0);
if (mark == 1) {
log.info("mark 1");
byteBuffer.get();
String info = new String(
byteBuffer.array(),
byteBuffer.position(),
byteBuffer.limit() - byteBuffer.position());
FileInfo fileInfo = new JsonMapper().readValue(info, FileInfo.class);
byteChannel = Files.newByteChannel(
Path.of("D:/data/" + fileInfo.getFileName()),
new StandardOpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE});
//ack
ByteBuffer buffer = ByteBuffer.allocate(4096);
buffer.put((byte) 2);
buffer.put("receive fileinfo".getBytes(StandardCharsets.UTF_8));
buffer.flip();
session.getBasicRemote().sendBinary(buffer);
} else if (mark == 3) {
log.info("mark 3");
byteBuffer.get();
byteChannel.write(byteBuffer);
} else if (mark == 5) {
log.info("mark 5");
//ack
ByteBuffer buffer = ByteBuffer.allocate(4096);
buffer.clear();
buffer.put((byte) 6);
buffer.put("receive end".getBytes(StandardCharsets.UTF_8));
buffer.flip();
session.getBasicRemote().sendBinary(buffer);
byteChannel.close();
byteChannel = null;
}
}
//...
public static class FileInfo implements Serializable {
private String fileName;
private long fileSize;
public String getFileName() {return fileName;}
public void setFileName(String fileName) {this.fileName = fileName;}
public long getFileSize() {return fileSize;}
public void setFileSize(long fileSize) {this.fileSize = fileSize;}
}
}
client 測試類, 連線後可以在命令列向 server 傳送訊息
首先是訊息處理中增加了 void onMessage(ByteBuffer bytes)
, 這個是用來接收伺服器端回傳的ACK的, 根據第一個位元組, 判斷伺服器端的處理結果. 這裡使用了一個 condition.notify()
用來通知傳送執行緒繼續傳送
其次是訊息傳送中, 用輸入的1
觸發檔案傳送. 檔案傳送在 void sendFile(WebSocketClient webSocketClient, Object condition)
方法中進行, 通過一個 condition 物件, 在檔案開始傳輸和結束傳輸時控制執行緒的暫停和繼續. byteBuffer.flip()
用於控制 byteBuffer 從寫狀態變為讀狀態, 用於傳送. flip is used to flip the ByteBuffer from "reading from I/O" (putting) to "writing to I/O" (getting).
public class SocketClient {
private static final Logger log = LoggerFactory.getLogger(SocketClient.class);
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
Object condition = new Object();
WebSocketClient wsClient = new WebSocketClient(new URI("ws://127.0.0.1:8763/websocket/server/10001")) {
//...
@Override
public void onMessage(ByteBuffer bytes) {
//To overwrite
byte mark = bytes.get(0);
if (mark == 2) {
synchronized (condition) {
condition.notify();
}
log.info("receive ack for file info");
} else if (mark == 6){
synchronized (condition) {
condition.notify();
}
log.info("receive ack for file end");
}
}
@Override
public void onClose(int i, String s, boolean b) {
log.info("On close: {}, {}, {}", i, s, b);
}
@Override
public void onError(Exception e) {
log.error("On error: {}", e.getMessage());
}
};
wsClient.connect();
log.info("Connecting ...");
while (!ReadyState.OPEN.equals(wsClient.getReadyState())) {
}
log.info("Connected");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String line = scanner.next();
if ("1".equals(line))
sendFile(wsClient, condition);
else
wsClient.send(line);
}
}
public static void sendFile(WebSocketClient webSocketClient, Object condition){
new Thread(() -> {
try {
SeekableByteChannel byteChannel = Files.newByteChannel(
Path.of("/home/milton/Backup/linux/apache-tomcat-8.5.58.tar.gz"),
new StandardOpenOption[]{StandardOpenOption.READ});
ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
byteBuffer.put((byte)1);
String info = "{\"fileName\": \"greproto.tar.gz\", \"fileSize\":"+byteChannel.size()+"}";
byteBuffer.put(info.getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
webSocketClient.send(byteBuffer);
synchronized (condition) {
condition.wait();
}
byteBuffer.clear();
byteBuffer.put((byte)3);
while (byteChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
webSocketClient.send(byteBuffer);
byteBuffer.clear();
byteBuffer.put((byte)3);
}
byteBuffer.clear();
byteBuffer.put((byte)5);
byteBuffer.put("end".getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
webSocketClient.send(byteBuffer);
synchronized (condition) {
condition.wait();
}
byteChannel.close();
} catch (InterruptedException|IOException e) {
log.error(e.getMessage(), e);
}
}).start();
}
}
範例是一個普通的 Spring Boot jar專案, 可以通過mvn clean package進行編譯, 再通過java -jar ws-demo01.jar執行, 啟動後工作在8763埠
將 SocketClient.java 中的檔案路徑 D:/WorkJava/tmp/greproto.tar.gz
換成自己原生的檔案路徑, 執行 SocketClient, 可以觀察到伺服器端接收到的訊息. 如果輸入1
並回車, 就會觸發使用者端往伺服器端傳輸檔案