SpringBoot+Netty+WebSocket實現訊息傳送

2020-09-20 04:00:05

SpringBoot搭建Netty實現訊息傳送

一.匯入Netty依賴

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>

二.搭建websocket伺服器

@Component
public class WebSocketServer {

    /**
     * 主執行緒池
     */
    private EventLoopGroup bossGroup;
    /**
     * 工作執行緒池
     */
    private EventLoopGroup workerGroup;
    /**
     * 伺服器
     */
    private ServerBootstrap server;
    /**
     *  回撥
     */
    private ChannelFuture future;

    public void start() {
        future = server.bind(9001);
        System.out.println("netty server - 啟動成功");
    }

    public WebSocketServer() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();

        server = new ServerBootstrap();
        server.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WebsocketInitializer());
    }
}

三.初始化Websocket


public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {
  
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // ------------------
        // 用於支援Http協定
        // ------------------
        // websocket基於http協定,需要有http的編解碼器
        pipeline.addLast(new HttpServerCodec());
        // 對寫巨量資料流的支援
        pipeline.addLast(new ChunkedWriteHandler());
        // 新增對HTTP請求和響應的聚合器:只要使用Netty進行Http程式設計都需要使用
        //設定單次請求的檔案的大小
        pipeline.addLast(new HttpObjectAggregator(1024 * 64));
        //webSocket 伺服器處理的協定,用於指定給使用者端連線存取的路由 :/ws
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        // 新增Netty空閒超時檢查的支援
        // 1. 讀空閒超時(超過一定的時間會傳送對應的事件訊息)
        // 2. 寫空閒超時
        // 3. 讀寫空閒超時
        pipeline.addLast(new IdleStateHandler(4, 8, 12));
        //新增心跳處理
        pipeline.addLast(new HearBeatHandler());
        // 新增自定義的handler
        pipeline.addLast(new ChatHandler());

    }
}

四.建立Netty監聽器


@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {

    @Resource
    private WebSocketServer websocketServer;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if(event.getApplicationContext().getParent() == null) {
            try {
                websocketServer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

五.建立訊息通道

public class UserChannelMap {
    /**
     * 使用者儲存使用者id與通道的Map物件
     */
//    private static Map<String, Channel> userChannelMap;

   /* static {
        userChannelMap = new HashMap<String, Channel>();
    }*/

    /**
     * 定義一個channel組,管理所有的channel
     * GlobalEventExecutor.INSTANCE 是全域性的事件執行器,是一個單例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放使用者與Chanel的對應資訊,用於給指定使用者傳送訊息
     */
    private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

    private UserChannelMap(){}
    /**
     * 新增使用者id與channel的關聯
     * @param userNum
     * @param channel
     */
    public static void put(String userNum, Channel channel) {
        userChannelMap.put(userNum, channel);
    }

    /**
     * 根據使用者id移除使用者id與channel的關聯
     * @param userNum
     */
    public static void remove(String userNum) {
        userChannelMap.remove(userNum);
    }

    /**
     * 根據通道id移除使用者與channel的關聯
     * @param channelId 通道的id
     */
    public static void removeByChannelId(String channelId) {
        if(!StringUtils.isNotBlank(channelId)) {
            return;
        }
        for (String s : userChannelMap.keySet()) {
            Channel channel = userChannelMap.get(s);
            if(channelId.equals(channel.id().asLongText())) {
                System.out.println("使用者端連線斷開,取消使用者" + s + "與通道" + channelId + "的關聯");
                userChannelMap.remove(s);
                UserService userService = SpringUtil.getBean(UserService.class);
                userService.logout(s);
                break;
            }
        }
    }

    /**
     * 列印所有的使用者與通道的關聯資料
     */
    public static void print() {
        for (String s : userChannelMap.keySet()) {
            System.out.println("使用者id:" + s + " 通道:" + userChannelMap.get(s).id());
        }
    }

    /**
     * 根據好友id獲取對應的通道
     * @param receiverNum 接收人編號
     * @return Netty通道
     */
    public static Channel get(String receiverNum) {
        return userChannelMap.get(receiverNum);
    }

    /**
     * 獲取channel組
     * @return
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }

    /**
     * 獲取使用者channel map
     * @return
     */
    public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
        return userChannelMap;
    }
}

六.自定義訊息型別


public class Message {
    /**
     * 訊息型別
     */
    private Integer type;
    /**
     * 聊天訊息
     */
    private String message;
    /**
     * 擴充套件訊息欄位
     */
    private Object ext;
    public Integer getType() {
        return type;
    }

    public void setType(Integer type) {
        this.type = type;
    }

    public MarketChatRecord getChatRecord() {
        return marketChatRecord;
    }
    public void setChatRecord(MarketChatRecord chatRecord) {
        this.marketChatRecord = chatRecord;
    }

    public Object getExt() {
        return ext;
    }

    public void setExt(Object ext) {
        this.ext = ext;
    }

    @Override
    public String toString() {
        return "Message{" +
                "type=" + type +
                ", marketChatRecord=" + marketChatRecord +
                ", ext=" + ext +
                '}';
    }

}

七.建立處理訊息的handler

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);


    /**
     * 用來儲存所有的使用者端連線
     */
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     *當Channel中有新的事件訊息會自動呼叫
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 當接收到資料後會自動呼叫
        // 獲取使用者端傳送過來的文字訊息
        Gson gson = new Gson();
        log.info("伺服器收到訊息:{}",msg.text());
        System.out.println("接收到訊息資料為:" + msg.text());
        Message message = gson.fromJson(msg.text(), Message.class);    
//根據業務要求進行訊息處理
        switch (message.getType()) {
            // 處理使用者端連線的訊息
            case 0:
                // 建立使用者與通道的關聯
            // 處理使用者端傳送好友訊息
             break;
            case 1:
            // 處理使用者端的簽收訊息
             break;
            case 2:
                // 將訊息記錄設定為已讀
                break;
            case 3:
                // 接收心跳訊息
                break;
            default:
                break;
        }

    }

    // 當有新的使用者端連線伺服器之後,會自動呼叫這個方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded 被呼叫"+ctx.channel().id().asLongText());
        // 新增到channelGroup 通道組
        UserChannelMap.getChannelGroup().add(ctx.channel());
//        clients.add(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("{異常:}"+cause.getMessage());
        // 刪除通道
        UserChannelMap.getChannelGroup().remove(ctx.channel());
        UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
        ctx.channel().close();
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved 被呼叫"+ctx.channel().id().asLongText());
        //刪除通道
        UserChannelMap.getChannelGroup().remove(ctx.channel());
        UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
        UserChannelMap.print();
    }

}

八.處理心跳

public class HearBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent)evt;

            if(idleStateEvent.state() == IdleState.READER_IDLE) {
                System.out.println("讀空閒事件觸發...");
            }
            else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
                System.out.println("寫空閒事件觸發...");
            }
            else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
                System.out.println("---------------");
                System.out.println("讀寫空閒事件觸發");
                System.out.println("關閉通道資源");
                ctx.channel().close();
            }
        }
    }
}

搭建完成後呼叫測試

1.頁面存取http://localhost:9001/ws
2.埠號9001和存取路徑ws都是我們在上邊設定的,然後傳入我們自定義的訊息message型別。
3.大概流程:訊息傳送 :使用者1先連線通道,然後傳送訊息給使用者2,使用者2若是線上直接可以傳送給使用者,若沒線上可以將訊息暫存在redis或者通道里,使用者2連結通道的話,兩者可以直接通訊。
訊息推播 :使用者1連線通道,根據通道id查詢要推播的人是否線上,或者推播給所有人,這裡我只推播給指定的人。
#todo