三此君看了好幾本書,看了很多遍原始碼整理的 一張圖進階 RocketMQ 圖片,關於 RocketMQ 你只需要記住這張圖!覺得不錯的話,記得點贊關注哦。
【重要】視訊在 B 站同步更新,歡迎圍觀,輕輕鬆鬆漲姿勢。一張圖進階 RocketMQ-通訊機制(視訊版)
點選檢視【bilibili】
本文是「一張圖進階 RocketMQ」第 4 篇,對 RocketMQ 不瞭解的同學可以先看看前面三期:
上一期分享了 RocketMQ 生產者啟動流程及同步訊息傳送流程,我們知道了在通訊層是基於 Netty 將訊息傳遞給 Broker 進行儲存的。如果對 Netty 完全不瞭解我們就很難真正理解 RocketMQ,所以今天我們簡單的聊一聊 Netty 基本流程,然後分析 RocketMQ 的通訊機制,最後通過非同步訊息傳送來串聯 RocketMQ 通訊機制。
Netty 有很多概念,等介紹完概念大家都困了,我們就不過多介紹了,直接結合範例來看看 Netty 的基礎流程,能夠幫助我們更好的理解 RocketMQ 即可。
我們先用 Netty 實現一個簡單的 伺服器端/使用者端 通訊範例,我們是這樣使用的,那 RocketMQ 基於 Netty 的通訊也應該是這樣使用的,不過是在這個基礎上封裝了一層。主要關注以下幾個點:伺服器端什麼時候初始化的,伺服器端實現的 Handler 做了什麼事?使用者端什麼時候初始化的,使用者端實現的 Handler 做了什麼事?
Netty 伺服器端初始化:初始化的程式碼很關鍵,我們要從原始碼上理解 RocketMQ 的通訊機制,那肯定會看到類似的程式碼。根據上面的流程來看,首先是範例化 bossGroup 和 workerGroup,然後初始化 Channel,從程式碼可以看出我們是在 Pipeline 中新增了自己實現的 Handler,這個 Handler 就是業務自己的邏輯了,那 RocketMQ 要處理資料應該也需要實現相應的 Handler。
public class MyServer {
public static void main(String[] args) throws Exception {
//建立兩個執行緒組 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//建立伺服器端的啟動物件,設定引數
ServerBootstrap bootstrap = new ServerBootstrap();
//設定兩個執行緒組boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//設定伺服器端通道實現型別
.channel(NioServerSocketChannel.class)
//使用匿名內部類的形式初始化Channel物件
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//給pipeline管道新增處理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//給workerGroup的EventLoop對應的管道設定處理器
//繫結埠號,啟動伺服器端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//對關閉通道進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
實現自定義的伺服器端處理器 Handler:自定義的 Handler 需要實現 Netty 定義的 HandlerAdapter,當有可讀事件時就會呼叫這裡的 channelRead() 方法。等下我們看 RocketMQ 通訊機制的時候留意RocketMQ 自定義了哪些 Handler,這些 Handler 有做了什麼事。
/**
* 自定義的Handler需要繼承Netty規定好的 HandlerAdapter 才能被Netty框架所關聯,有點類似SpringMVC的介面卡模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//獲取使用者端傳送過來的訊息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到" + ctx.channel().remoteAddress() + "傳送的訊息:" + byteBuf.toString(CharsetUtil.UTF_8));
//傳送訊息給使用者端
ctx.writeAndFlush(Unpooled.copiedBuffer("伺服器端已收到訊息,記得關注三此君,記得三連", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//發生異常,關閉通道
ctx.close();
}
}
Netty 使用者端初始化:Netty 使用者端,在 RocketMQ 中對應了 Producer/Consumer。在 Producer 啟動中有一步是啟動通訊模組服務,其實就是初始化 Netty 使用者端。使用者端也需要先範例化一個 NioEventLoopGroup,然後將自定義的 handler 新增到 Pipeline,還有很重要的一步是我們需要 connect 連線到 Netty 伺服器端。
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//建立bootstrap啟動引導物件,設定引數
Bootstrap bootstrap = new Bootstrap();
//設定執行緒組
bootstrap.group(eventExecutors)
//設定使用者端的Channel實現型別
.channel(NioSocketChannel.class)
//使用匿名內部類初始化 Pipeline
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//新增使用者端Channel的處理器
ch.pipeline().addLast(new MyClientHandler());
}
})
//connect連線伺服器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//對Channel關閉進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
//關閉執行緒組
eventExecutors.shutdownGracefully();
}
}
}
實現自定義的使用者端處理器 Handler:使用者端處理器也繼承自 Netty 定義的 HandlerAdapter,當 Channel 變得可讀的時候(伺服器端資料返回)會呼叫我們自己實現的 channelRead()。
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//傳送訊息到伺服器端
ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生產者傳送訊息~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收伺服器端傳送過來的訊息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到三此君的訊息,我一定會三連的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8));
}
}
RocketMQ 通訊模組基於 Netty 實現,總體程式碼量不多。主要是 NettyRemotingServer和NettyRemotingClient,分別對應通訊的伺服器端和使用者端。根據前面的 Netty 範例,我們要理解 RocketMQ 如何基於 Netty 通訊,只需要知道 4 個地方:NettyRemotingServer 如何初始化,NettyRemotingClient 初始化,如何基於 NettyRemotingClient 傳送訊息,無論是使用者端還是伺服器端收到資料後都需要 Handler 來處理。
除了同步訊息傳送,RocketMQ 還支援非同步傳送。我們只需要在前面是範例中稍作修改就會得到一個非同步傳送範例,最大的不同在於傳送的時候傳入 SendCallback 接收非同步返回結果回撥。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 範例化訊息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer範例
producer.start();
// 建立訊息,並指定Topic,Tag和訊息體
Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8"));
// SendCallback 接收非同步返回結果的回撥
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("關注呀!!!%-10d OK %s %n", index,sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("三連呀!!!%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
// 如果不再傳送訊息,關閉Producer範例。
producer.shutdown();
}
}
同步傳送個非同步傳送主要的過程都是一樣的,不同點在於同步訊息呼叫 Netty Channel.writeAndFlush 之後是 waitResponse 等待 Broker 返回,而非同步訊息是呼叫預先定義好的回撥函數。
非同步訊息和同步訊息整體差不多,可以說在基於 Netty 實現非同步訊息比同步訊息還要簡單一下,我們這裡主要來看一些不同點:
以上就是 RocketMQ 訊息傳送的主要內容,我們簡單的總結下:
以上就是今天全部的內容,如果覺得本期的內容對你有用的話記得點贊、關注、轉發收藏,這將是對我最大的支援。如果你需要 RocketMQ 相關的所有資料,可以評論區留言,或者關注三此君的公眾號,回覆 mq 即可。
訊息已經傳送給了 Broker,下一期我們將來看看Broker 是如何儲存訊息的,RocketMQ 如何支援百萬級的吞吐量?感謝觀看,我們下期再見