簡單記錄一下實現的整體框架,具體細節在實際生產中再細化就可以了。
第一步 引入netty依賴
SpringBoot的其他必要的依賴像Mybatis、Lombok這些都是老生常談了 就不在這裡放了
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.85.Final</version> </dependency>
第二步 接下來就是準備工作。
訊息服務類(核心程式碼) 聊天服務的功能就是靠這個類的start()函數來啟動的 繫結埠8087 之後可以通socket協定存取這個埠來執行通訊
import com.bxt.demo.im.handler.WebSocketHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @Description: 即時通訊服務類 * @author: bhw * @date: 2023年09月27日 13:44 */ @Slf4j public class IMServer {
// 用來存放連入伺服器的使用者集合 public static final Map<String, Channel> USERS = new ConcurrentHashMap<>(1024); // 用來存放建立的群聊連線 public static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void start() throws InterruptedException { log.info("IM服務開始啟動"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); // 繫結埠 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 新增http編碼解碼器 pipeline.addLast(new HttpServerCodec()) //支援巨量資料流 .addLast(new ChunkedWriteHandler()) // 對http訊息做聚合操作 FullHttpRequest FullHttpResponse .addLast(new HttpObjectAggregator(1024*64)) //支援websocket .addLast(new WebSocketServerProtocolHandler("/")) .addLast(new WebSocketHandler()); } }); ChannelFuture future = bootstrap.bind(8087).sync(); log.info("伺服器啟動開始監聽埠: {}", 8087); future.channel().closeFuture().sync(); //關閉主執行緒組 bossGroup.shutdownGracefully(); //關閉工作執行緒組 workGroup.shutdownGracefully(); } }
建立聊天訊息實體類
/** * @Description: 聊天訊息物件 可以自行根據實際業務擴充套件 * @author: seizedays */ @Data public class ChatMessage extends IMCommand { //訊息型別 private Integer type; //訊息目標物件 private String target; //訊息內容 private String content; }
連線型別列舉類,暫時定義為建立連線、傳送訊息和加入群組三種狀態碼
@AllArgsConstructor @Getter public enum CommandType { //建立連線 CONNECT(10001), //傳送訊息 CHAT(10002), //加入群聊 JOIN_GROUP(10003), ERROR(-1) ; private Integer code; public static CommandType match(Integer code){ for (CommandType value : CommandType.values()) { if (value.code.equals(code)){ return value; } } return ERROR; } }
命令動作為聊天的時候 訊息型別又劃分為私聊和群聊兩種 列舉類如下:
@AllArgsConstructor @Getter public enum MessageType { //私聊 PRIVATE(1), //群聊 GROUP(2), ERROR(-1) ; private Integer type; public static MessageType match(Integer code){ for (MessageType value : MessageType.values()) { if (value.type.equals(code)){ return value; } } return ERROR; } }
建立連線請求的攔截器
import com.alibaba.fastjson2.JSON; import com.bxt.common.vo.Result; import com.bxt.demo.im.cmd.IMCommand; import com.bxt.demo.im.server.IMServer; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * @Description: 使用者連線到伺服器端的攔截器 * @author: bhw * @date: 2023年09月27日 14:28 */ public class ConnectionHandler { public static void execute(ChannelHandlerContext ctx, IMCommand command) { if (IMServer.USERS.containsKey(command.getNickName())) { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(command.getNickName() + "已經線上,不能重複連線")))); ctx.channel().disconnect(); return; } IMServer.USERS.put(command.getNickName(), ctx.channel()); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("系統訊息:" + command.getNickName() + "與伺服器端連線成功")))); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success(JSON.toJSONString(IMServer.USERS.keySet()))))); } }
加入群組功能的攔截器
/** * @Description: 加入群聊攔截器 * @author: bhw * @date: 2023年09月27日 15:07 */ public class JoinGroupHandler { public static void execute(ChannelHandlerContext ctx) { try { IMServer.GROUP.add(ctx.channel()); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("加入系統預設群組成功!")))); } catch (Exception e) { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("訊息內容異常")))); } } }
傳送聊天到指定物件的功能攔截器
import com.alibaba.excel.util.StringUtils; import com.alibaba.fastjson2.JSON; import com.bxt.common.vo.Result; import com.bxt.demo.im.cmd.ChatMessage; import com.bxt.demo.im.cmd.MessageType; import com.bxt.demo.im.server.IMServer; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.util.Objects; /** * @Description: 聊天攔截器 * @author: bhw * @date: 2023年09月27日 15:07 */ public class ChatHandler { public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame) { try { ChatMessage message = JSON.parseObject(frame.text(), ChatMessage.class); MessageType msgType = MessageType.match(message.getType()); if (msgType.equals(MessageType.PRIVATE)) { if (StringUtils.isBlank(message.getTarget())){ ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統訊息:訊息傳送失敗,請選擇訊息傳送物件")))); return; } Channel channel = IMServer.USERS.get(message.getTarget()); if (Objects.isNull(channel) || !channel.isActive()){ ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統訊息:訊息傳送失敗,對方不線上")))); IMServer.USERS.remove(message.getTarget()); return; } channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("私聊訊息(" + message.getTarget() + "):" + message.getContent())))); } else if (msgType.equals(MessageType.GROUP)) { IMServer.GROUP.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("群訊息:傳送者(" + message.getNickName() + "):" + message.getContent())))); }else { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統訊息:不支援的訊息型別")))); } } catch (Exception e) { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("訊息內容異常")))); } } }
最後是websocket攔截器 接收到使用者端的指令後選擇對應的攔截器實現相應的功能:
import com.alibaba.fastjson2.JSON; import com.bxt.common.vo.Result; import com.bxt.demo.im.cmd.CommandType; import com.bxt.demo.im.cmd.IMCommand; import com.bxt.demo.im.server.IMServer; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; /** * @Description: websocket攔截器 * @author: bhw * @date: 2023年09月27日 13:59 */ @Slf4j public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) { System.out.println(frame.text()); try { IMCommand command = JSON.parseObject(frame.text(), IMCommand.class); CommandType cmdType = CommandType.match(command.getCode()); if (cmdType.equals(CommandType.CONNECT)){ ConnectionHandler.execute(ctx, command); } else if (cmdType.equals(CommandType.CHAT)) { ChatHandler.execute(ctx,frame); } else if (cmdType.equals(CommandType.JOIN_GROUP)) { JoinGroupHandler.execute(ctx); } else { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("不支援的code")))); } }catch (Exception e){ ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(e.getMessage())))); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 當連線斷開時被呼叫 Channel channel = ctx.channel(); // 從 USERS Map 中移除對應的 Channel removeUser(channel); super.channelInactive(ctx); } private void removeUser(Channel channel) { // 遍歷 USERS Map,找到並移除對應的 Channel IMServer.USERS.entrySet().removeIf(entry -> entry.getValue() == channel); } }
第三步 啟動服務
@SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); // 啟動IM服務 try { IMServer.start(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
現在 使用者端通過socket協定存取8087埠即可實現基本的聊天室功能了!