一.匯入Netty依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
二.搭建websocket伺服器
@Component
public class WebSocketServer {
/**
* 主執行緒池
*/
private EventLoopGroup bossGroup;
/**
* 工作執行緒池
*/
private EventLoopGroup workerGroup;
/**
* 伺服器
*/
private ServerBootstrap server;
/**
* 回撥
*/
private ChannelFuture future;
public void start() {
future = server.bind(9001);
System.out.println("netty server - 啟動成功");
}
public WebSocketServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketInitializer());
}
}
三.初始化Websocket
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ------------------
// 用於支援Http協定
// ------------------
// websocket基於http協定,需要有http的編解碼器
pipeline.addLast(new HttpServerCodec());
// 對寫巨量資料流的支援
pipeline.addLast(new ChunkedWriteHandler());
// 新增對HTTP請求和響應的聚合器:只要使用Netty進行Http程式設計都需要使用
//設定單次請求的檔案的大小
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
//webSocket 伺服器處理的協定,用於指定給使用者端連線存取的路由 :/ws
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 新增Netty空閒超時檢查的支援
// 1. 讀空閒超時(超過一定的時間會傳送對應的事件訊息)
// 2. 寫空閒超時
// 3. 讀寫空閒超時
pipeline.addLast(new IdleStateHandler(4, 8, 12));
//新增心跳處理
pipeline.addLast(new HearBeatHandler());
// 新增自定義的handler
pipeline.addLast(new ChatHandler());
}
}
四.建立Netty監聽器
@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private WebSocketServer websocketServer;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if(event.getApplicationContext().getParent() == null) {
try {
websocketServer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
五.建立訊息通道
public class UserChannelMap {
/**
* 使用者儲存使用者id與通道的Map物件
*/
// private static Map<String, Channel> userChannelMap;
/* static {
userChannelMap = new HashMap<String, Channel>();
}*/
/**
* 定義一個channel組,管理所有的channel
* GlobalEventExecutor.INSTANCE 是全域性的事件執行器,是一個單例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存放使用者與Chanel的對應資訊,用於給指定使用者傳送訊息
*/
private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();
private UserChannelMap(){}
/**
* 新增使用者id與channel的關聯
* @param userNum
* @param channel
*/
public static void put(String userNum, Channel channel) {
userChannelMap.put(userNum, channel);
}
/**
* 根據使用者id移除使用者id與channel的關聯
* @param userNum
*/
public static void remove(String userNum) {
userChannelMap.remove(userNum);
}
/**
* 根據通道id移除使用者與channel的關聯
* @param channelId 通道的id
*/
public static void removeByChannelId(String channelId) {
if(!StringUtils.isNotBlank(channelId)) {
return;
}
for (String s : userChannelMap.keySet()) {
Channel channel = userChannelMap.get(s);
if(channelId.equals(channel.id().asLongText())) {
System.out.println("使用者端連線斷開,取消使用者" + s + "與通道" + channelId + "的關聯");
userChannelMap.remove(s);
UserService userService = SpringUtil.getBean(UserService.class);
userService.logout(s);
break;
}
}
}
/**
* 列印所有的使用者與通道的關聯資料
*/
public static void print() {
for (String s : userChannelMap.keySet()) {
System.out.println("使用者id:" + s + " 通道:" + userChannelMap.get(s).id());
}
}
/**
* 根據好友id獲取對應的通道
* @param receiverNum 接收人編號
* @return Netty通道
*/
public static Channel get(String receiverNum) {
return userChannelMap.get(receiverNum);
}
/**
* 獲取channel組
* @return
*/
public static ChannelGroup getChannelGroup() {
return channelGroup;
}
/**
* 獲取使用者channel map
* @return
*/
public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
return userChannelMap;
}
}
六.自定義訊息型別
public class Message {
/**
* 訊息型別
*/
private Integer type;
/**
* 聊天訊息
*/
private String message;
/**
* 擴充套件訊息欄位
*/
private Object ext;
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public MarketChatRecord getChatRecord() {
return marketChatRecord;
}
public void setChatRecord(MarketChatRecord chatRecord) {
this.marketChatRecord = chatRecord;
}
public Object getExt() {
return ext;
}
public void setExt(Object ext) {
this.ext = ext;
}
@Override
public String toString() {
return "Message{" +
"type=" + type +
", marketChatRecord=" + marketChatRecord +
", ext=" + ext +
'}';
}
}
七.建立處理訊息的handler
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 用來儲存所有的使用者端連線
*/
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
*當Channel中有新的事件訊息會自動呼叫
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 當接收到資料後會自動呼叫
// 獲取使用者端傳送過來的文字訊息
Gson gson = new Gson();
log.info("伺服器收到訊息:{}",msg.text());
System.out.println("接收到訊息資料為:" + msg.text());
Message message = gson.fromJson(msg.text(), Message.class);
//根據業務要求進行訊息處理
switch (message.getType()) {
// 處理使用者端連線的訊息
case 0:
// 建立使用者與通道的關聯
// 處理使用者端傳送好友訊息
break;
case 1:
// 處理使用者端的簽收訊息
break;
case 2:
// 將訊息記錄設定為已讀
break;
case 3:
// 接收心跳訊息
break;
default:
break;
}
}
// 當有新的使用者端連線伺服器之後,會自動呼叫這個方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("handlerAdded 被呼叫"+ctx.channel().id().asLongText());
// 新增到channelGroup 通道組
UserChannelMap.getChannelGroup().add(ctx.channel());
// clients.add(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("{異常:}"+cause.getMessage());
// 刪除通道
UserChannelMap.getChannelGroup().remove(ctx.channel());
UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
ctx.channel().close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("handlerRemoved 被呼叫"+ctx.channel().id().asLongText());
//刪除通道
UserChannelMap.getChannelGroup().remove(ctx.channel());
UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
UserChannelMap.print();
}
}
八.處理心跳
public class HearBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
if(idleStateEvent.state() == IdleState.READER_IDLE) {
System.out.println("讀空閒事件觸發...");
}
else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
System.out.println("寫空閒事件觸發...");
}
else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
System.out.println("---------------");
System.out.println("讀寫空閒事件觸發");
System.out.println("關閉通道資源");
ctx.channel().close();
}
}
}
}
搭建完成後呼叫測試
1.頁面存取http://localhost:9001/ws
2.埠號9001和存取路徑ws都是我們在上邊設定的,然後傳入我們自定義的訊息message型別。
3.大概流程:訊息傳送 :使用者1先連線通道,然後傳送訊息給使用者2,使用者2若是線上直接可以傳送給使用者,若沒線上可以將訊息暫存在redis或者通道里,使用者2連結通道的話,兩者可以直接通訊。
訊息推播 :使用者1連線通道,根據通道id查詢要推播的人是否線上,或者推播給所有人,這裡我只推播給指定的人。
#todo