基於SpringBoot+Netty實現即時通訊(IM)功能

2023-10-18 18:00:45

簡單記錄一下實現的整體框架,具體細節在實際生產中再細化就可以了。

第一步 引入netty依賴

SpringBoot的其他必要的依賴像Mybatis、Lombok這些都是老生常談了 就不在這裡放了

       <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.85.Final</version>
        </dependency>

 

第二步 接下來就是準備工作。

訊息服務類(核心程式碼) 聊天服務的功能就是靠這個類的start()函數來啟動的 繫結埠8087 之後可以通socket協定存取這個埠來執行通訊

import com.bxt.demo.im.handler.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Description: 即時通訊服務類
 * @author: bhw
 * @date: 2023年09月27日 13:44
 */
@Slf4j
public class IMServer {
  // 用來存放連入伺服器的使用者集合
public static final Map<String, Channel> USERS = new ConcurrentHashMap<>(1024);   // 用來存放建立的群聊連線 public static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void start() throws InterruptedException { log.info("IM服務開始啟動"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); // 繫結埠 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 新增http編碼解碼器 pipeline.addLast(new HttpServerCodec()) //支援巨量資料流 .addLast(new ChunkedWriteHandler()) // 對http訊息做聚合操作 FullHttpRequest FullHttpResponse .addLast(new HttpObjectAggregator(1024*64)) //支援websocket .addLast(new WebSocketServerProtocolHandler("/")) .addLast(new WebSocketHandler()); } }); ChannelFuture future = bootstrap.bind(8087).sync(); log.info("伺服器啟動開始監聽埠: {}", 8087); future.channel().closeFuture().sync(); //關閉主執行緒組 bossGroup.shutdownGracefully(); //關閉工作執行緒組 workGroup.shutdownGracefully(); } }

 

 建立聊天訊息實體類

/**
 * @Description: 聊天訊息物件 可以自行根據實際業務擴充套件
 * @author: seizedays
 */
@Data
public class ChatMessage extends IMCommand {
    //訊息型別
    private Integer type;
    //訊息目標物件
    private String target;
    //訊息內容
    private String content;

}

連線型別列舉類,暫時定義為建立連線、傳送訊息和加入群組三種狀態碼

@AllArgsConstructor
@Getter
public enum CommandType {

    //建立連線
    CONNECT(10001),
    //傳送訊息
    CHAT(10002),
    //加入群聊
    JOIN_GROUP(10003),
    ERROR(-1)
    ;


    private Integer code;

    public static CommandType match(Integer code){
        for (CommandType value : CommandType.values()) {
            if (value.code.equals(code)){
                return value;
            }
        }
        return ERROR;
    }

}

命令動作為聊天的時候 訊息型別又劃分為私聊和群聊兩種 列舉類如下:

@AllArgsConstructor
@Getter
public enum MessageType {

    //私聊
    PRIVATE(1),
    //群聊
    GROUP(2),
    ERROR(-1)
    ;
    private Integer type;

    public static MessageType match(Integer code){
        for (MessageType value : MessageType.values()) {
            if (value.type.equals(code)){
                return value;
            }
        }
        return ERROR;
    }

}

 

建立連線請求的攔截器

import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.IMCommand;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * @Description: 使用者連線到伺服器端的攔截器
 * @author: bhw
 * @date: 2023年09月27日 14:28
 */
public class ConnectionHandler {
    public static void execute(ChannelHandlerContext ctx, IMCommand command) {
        if (IMServer.USERS.containsKey(command.getNickName())) {
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(command.getNickName() + "已經線上,不能重複連線"))));
            ctx.channel().disconnect();
            return;
        }

        IMServer.USERS.put(command.getNickName(), ctx.channel());

        ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("系統訊息:" + command.getNickName() + "與伺服器端連線成功"))));

        ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success(JSON.toJSONString(IMServer.USERS.keySet())))));
    }
}

加入群組功能的攔截器

/**
 * @Description: 加入群聊攔截器
 * @author: bhw
 * @date: 2023年09月27日 15:07
 */
public class JoinGroupHandler {
    public static void execute(ChannelHandlerContext ctx) {
        try {
            IMServer.GROUP.add(ctx.channel());
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("加入系統預設群組成功!"))));
        } catch (Exception e) {
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("訊息內容異常"))));
        }

    }
}

傳送聊天到指定物件的功能攔截器

import com.alibaba.excel.util.StringUtils;
import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.ChatMessage;
import com.bxt.demo.im.cmd.MessageType;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.Objects;

/**
 * @Description: 聊天攔截器
 * @author: bhw
 * @date: 2023年09月27日 15:07
 */
public class ChatHandler {
    public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        try {
            ChatMessage message = JSON.parseObject(frame.text(), ChatMessage.class);
            MessageType msgType = MessageType.match(message.getType());

            if (msgType.equals(MessageType.PRIVATE)) {
                if (StringUtils.isBlank(message.getTarget())){
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統訊息:訊息傳送失敗,請選擇訊息傳送物件"))));
                    return;
                }
                Channel channel = IMServer.USERS.get(message.getTarget());
                if (Objects.isNull(channel) || !channel.isActive()){
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統訊息:訊息傳送失敗,對方不線上"))));
                    IMServer.USERS.remove(message.getTarget());
                    return;
                }
                channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("私聊訊息(" + message.getTarget() + "):" + message.getContent()))));

            } else if (msgType.equals(MessageType.GROUP)) {
                IMServer.GROUP.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("群訊息:傳送者(" + message.getNickName() + "):" + message.getContent()))));
            }else {
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統訊息:不支援的訊息型別"))));
            }


        } catch (Exception e) {
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("訊息內容異常"))));
        }

    }
}

最後是websocket攔截器 接收到使用者端的指令後選擇對應的攔截器實現相應的功能:

import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.CommandType;
import com.bxt.demo.im.cmd.IMCommand;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

/**
 * @Description: websocket攔截器
 * @author: bhw
 * @date: 2023年09月27日 13:59
 */
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        System.out.println(frame.text());
        try {
            IMCommand command = JSON.parseObject(frame.text(), IMCommand.class);
            CommandType cmdType = CommandType.match(command.getCode());
            if (cmdType.equals(CommandType.CONNECT)){
                ConnectionHandler.execute(ctx, command);
            } else if (cmdType.equals(CommandType.CHAT)) {
                ChatHandler.execute(ctx,frame);
            } else if (cmdType.equals(CommandType.JOIN_GROUP)) {
                JoinGroupHandler.execute(ctx);
            } else {
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("不支援的code"))));
            }
        }catch (Exception e){
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(e.getMessage()))));
        }

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 當連線斷開時被呼叫
        Channel channel = ctx.channel();
        // 從 USERS Map 中移除對應的 Channel
        removeUser(channel);
        super.channelInactive(ctx);
    }

    private void removeUser(Channel channel) {
        // 遍歷 USERS Map,找到並移除對應的 Channel
        IMServer.USERS.entrySet().removeIf(entry -> entry.getValue() == channel);
    }
}

第三步 啟動服務

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
       // 啟動IM服務
        try {
            IMServer.start();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

現在 使用者端通過socket協定存取8087埠即可實現基本的聊天室功能了!