本文是 RocketMQ 系列的第四篇,下面是前面幾篇的文章,不清楚的話點選看一下吧。
RocketMQ 作為一款優秀的分散式訊息中介軟體,可以為業務方提供高效能低延遲的穩定可靠的訊息服務。其核心優勢是可靠的消費儲存、訊息傳送的高效能和低延遲、強大的訊息堆積能力和訊息處理能力。
從儲存方式來看,主要有幾個方面:
從效率上來講,檔案系統高於KV儲存,KV儲存又高於關係型資料庫。因為直接操作檔案系統肯定是最快的,那麼業界主流的訊息佇列中介軟體,如RocketMQ 、RabbitMQ 、kafka
都是採用檔案系統的方式來儲存訊息。
這篇文章我們來探索一下 RocketMQ 訊息儲存的機制。
這裡我們直接參照官方的流程圖,如下:
由於 commitlog 是一個二進位制檔案,我們沒有辦法直接讀取其內容,所以需要通過程式讀取它的位元組陣列:
public static ByteBuffer read(String path)throws Exception{
File file = new File(path);
FileInputStream fin = new FileInputStream(file);
byte[] bytes = new byte[(int)file.length()];
fin.read(bytes);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
return buffer;
}
如上程式碼,可以通過傳入檔案的路徑,讀取該檔案所有的內容。為了方便下一步操作,我們把讀取到的位元組陣列轉換為java.nio.ByteBuffer
物件。
在解析之前,我們需要弄明白兩件事:
在上面的圖中,我們已經看到了訊息的格式,包含了19個欄位。關於位元組大小,有的是 4 位元組,有的是 8 位元組,我們不再一一贅述,直接看程式碼。
/**
* commitlog 檔案解析
* @param byteBuffer
* @return
* @throws Exception
*/
public static MessageExt decodeCommitLog(ByteBuffer byteBuffer)throws Exception {
MessageExt msgExt = new MessageExt();
// 1 TOTALSIZE
int storeSize = byteBuffer.getInt();
msgExt.setStoreSize(storeSize);
if (storeSize<=0){
return null;
}
// 2 MAGICCODE
byteBuffer.getInt();
// 3 BODYCRC
int bodyCRC = byteBuffer.getInt();
msgExt.setBodyCRC(bodyCRC);
// 4 QUEUEID
int queueId = byteBuffer.getInt();
msgExt.setQueueId(queueId);
// 5 FLAG
int flag = byteBuffer.getInt();
msgExt.setFlag(flag);
// 6 QUEUEOFFSET
long queueOffset = byteBuffer.getLong();
msgExt.setQueueOffset(queueOffset);
// 7 PHYSICALOFFSET
long physicOffset = byteBuffer.getLong();
msgExt.setCommitLogOffset(physicOffset);
// 8 SYSFLAG
int sysFlag = byteBuffer.getInt();
msgExt.setSysFlag(sysFlag);
// 9 BORNTIMESTAMP
long bornTimeStamp = byteBuffer.getLong();
msgExt.setBornTimestamp(bornTimeStamp);
// 10 BORNHOST
int bornhostIPLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 : 16;
byte[] bornHost = new byte[bornhostIPLength];
byteBuffer.get(bornHost, 0, bornhostIPLength);
int port = byteBuffer.getInt();
msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));
// 11 STORETIMESTAMP
long storeTimestamp = byteBuffer.getLong();
msgExt.setStoreTimestamp(storeTimestamp);
// 12 STOREHOST
int storehostIPLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;
byte[] storeHost = new byte[storehostIPLength];
byteBuffer.get(storeHost, 0, storehostIPLength);
port = byteBuffer.getInt();
msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));
// 13 RECONSUMETIMES
int reconsumeTimes = byteBuffer.getInt();
msgExt.setReconsumeTimes(reconsumeTimes);
// 14 Prepared Transaction Offset
long preparedTransactionOffset = byteBuffer.getLong();
msgExt.setPreparedTransactionOffset(preparedTransactionOffset);
// 15 BODY
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
byte[] body = new byte[bodyLen];
byteBuffer.get(body);
msgExt.setBody(body);
}
// 16 TOPIC
byte topicLen = byteBuffer.get();
byte[] topic = new byte[(int) topicLen];
byteBuffer.get(topic);
msgExt.setTopic(new String(topic, CHARSET_UTF8));
// 17 properties
short propertiesLength = byteBuffer.getShort();
if (propertiesLength > 0) {
byte[] properties = new byte[propertiesLength];
byteBuffer.get(properties);
String propertiesString = new String(properties, CHARSET_UTF8);
Map<String, String> map = string2messageProperties(propertiesString);
}
int msgIDLength = storehostIPLength + 4 + 8;
ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
msgExt.setMsgId(msgId);
return msgExt;
}
解析方法寫完之後,開始輸出 commitlog 內容。
這裡將 Linux 伺服器上的 commitlog 檔案拷貝到本地進行測試:
public static void main(String[] args) throws Exception {
String filePath = "C:\\Users\\Administrator\\Desktop\\00000000000000000000";
ByteBuffer buffer = read(filePath);
List<MessageExt> messageList = new ArrayList<>();
while (true){
MessageExt message = decodeCommitLog(buffer);
if (message==null){
break;
}
messageList.add(message);
}
for (MessageExt ms:messageList) {
System.out.println("主題:"+ms.getTopic()+" 訊息:"+
new String(ms.getBody())+" 佇列ID:"+ms.getQueueId()+" 儲存地址:"+ms.getStoreHost());
}
}
控制檯成功輸出解析後的內容:
主題:topicClean 訊息:syncMessage 佇列ID:0 儲存地址:/192.168.0.17:10911
在瞭解 ConsumeQueue之前,有必要先了解 MessageQueue 的概念。
我們知道,在傳送訊息的時候,要指定一個 Topic。那麼,在建立 Topic 的時候,有一個很重要的引數MessageQueue
。簡單來說,就是你這個Topic對應了多少個佇列,也就是幾個MessageQueue
,預設是4個。那它的作用是什麼呢 ?
假設我們的 Topic 裡面有 100 條資料,該 Topic 預設是 4 個MessageQueue
,那麼每個MessageQueue
中大約 25 條資料。 然後,這些MessageQueue
是和Broker
繫結在一起的,就是說每個MessageQueue
都可能處於不同的Broker
機器上,這取決於你的佇列數量和Broker叢集。
通過 RocketMQ 控制檯可以看到 Topic 下的 MessageQueue
情況:
能夠看到 Topic 為 topicClean 的主題下,一共有四個 MessageQueue, 由於我的 Broker 只有一個,所以 brokerName 都是一樣的。
既然MessageQueue
是多個,那麼在訊息傳送的時候,勢必要通過某種方式選擇一個佇列。預設的情況下,就是通過輪詢來獲取一個訊息佇列。
檢視 RocketMQ 原始碼,找到方法如下:
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
介紹完了 MessageQueue
,那麼它跟 ConsumeQueue
有什麼關係呢?而 ConsumeQueue
又是什麼?
與 MessageQueue關係
ConsumeQueue
是一組組檔案,而一個 MessageQueue 對應其中一組 ConsumeQueue 檔案,主要的作用是記錄當前 MessageQueue 被哪些消費者組消費到了 Commitlog 中哪一條訊息。
ConsumeQueue
目錄下面是以 Topic 命名的資料夾,然後再下一級是以MessageQueue
佇列ID命名的資料夾,最後才是一個或多個 ConsumeQueue
檔案:
Index 檔案結構如下所示:
從圖中可以看出,Index 檔案分為如下 3 部分,IndexHead,Hash 槽,Index 條目。
IndexHead 的格式如下
欄位 | 解釋 |
---|---|
beginTimestamp | 訊息的最小儲存時間 |
endTimestamp | 訊息的最大儲存時間 |
beginPhyOffset | 訊息的最小偏移量(commitLog 檔案中的偏移量) |
endPhyOffset | 訊息的最大偏移量(commitLog 檔案中的偏移量) |
hashSlotCount | hash 槽個數 |
indexCount | index 條目當前已使用的個數 |
Hash 槽儲存的內容為落在該 Hash 槽內的 Index 的索引(看後面圖示你就會很清楚了)
每個 Index 條目的格式如下
欄位 | 解釋 |
---|---|
hashcode | key 的hashcode |
phyoffset | 訊息的偏移量(commitLog檔案中的偏移量) |
timedif | 該訊息儲存時間與第一條訊息的時間戳的差值,小於0該訊息無效 |
pre index no | 該條目的前一條記錄的Index索引,當hash衝突時,用來構建連結串列 |
key 的組成有如下兩種形式
Unique Key 是在 Producer 端傳送訊息生成的
// DefaultMQProducerImpl#sendKernelImpl
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
Message Key 是我們在傳送訊息的時候設定的,通常具有業務意義,方便我們快速查詢訊息
/**
* 傳送 message key 訊息
*/
@RequestMapping("/containKeySend")
public void containKeySend() {
//指定Topic與Tag,格式: `topicName:tags`
SendResult sendResult = rocketMQTemplate.syncSend("topicClean:tagTest",
MessageBuilder
.withPayload("this is message")
// key:可通過key查詢訊息軌跡,如訊息被誰消費,定位訊息丟失問題。由於是雜湊索引,須保證key儘可能唯一
.setHeader(MessageConst.PROPERTY_KEYS, "123456").build());
System.out.println("傳送延時訊息:" + sendResult.toString());
}
Index 檔案構成過程比較麻煩,你可以把 Index檔案 想成基於檔案實現的 HashMap。
假如說往陣列長度為 10 的HashMap依次放入3個 key 為11,34,21 的資料(以尾插法演示了),HashMap 的結構如下:
將key為11,34,21的資料放到IndexFile中的過程如下(假如hash槽的數量為10)
具體的過程為
從圖中可以看出來,當發生 hash 衝突時 Index 條目的 preIndexNo 屬性充當了連結串列的作用。查詢的過程和 HashMap 基本類似,先定位到槽的位置,然後順著連結串列找就行了。
為了便於理解,我們還是以程式碼的方式,來解析這個檔案。
public static void main(String[] args) throws Exception {
//index索引檔案的路徑
String path = "D:\\software\\IdeaProjects\\cloud\\rockemq-producer\\src\\file\\index\\20230823171341863";
ByteBuffer buffer = DecodeCommitLogTools.read(path);
//該索引檔案中包含訊息的最小儲存時間
long beginTimestamp = buffer.getLong();
//該索引檔案中包含訊息的最大儲存時間
long endTimestamp = buffer.getLong();
//該索引檔案中包含訊息的最大物理偏移量(commitlog檔案偏移量)
long beginPhyOffset = buffer.getLong();
//該索引檔案中包含訊息的最大物理偏移量(commitlog檔案偏移量)
long endPhyOffset = buffer.getLong();
//hashslot個數
int hashSlotCount = buffer.getInt();
//Index條目列表當前已使用的個數
int indexCount = buffer.getInt();
//500萬個hash槽,每個槽佔4個位元組,儲存的是index索引
for (int i=0;i<5000000;i++){
buffer.getInt();
}
//2000萬個index條目
for (int j=0;j<20000000;j++){
//訊息key的hashcode
int hashcode = buffer.getInt();
//訊息對應的偏移量
long offset = buffer.getLong();
//訊息儲存時間和第一條訊息的差值
int timedif = buffer.getInt();
//該條目的上一條記錄的index索引
int pre_no = buffer.getInt();
}
System.out.println(buffer.position()==buffer.capacity());
}
我們看最後輸出的結果為 true,則證明解析的過程無誤。
我們傳送的訊息體中,包含Message Key 或 Unique Key
,那麼就會給它們每一個都構建索引。
這裡重點有兩個:
將當前 Index條目 的索引值,寫在Hash槽absSlotPos
位置上;將Index條目的具體資訊(hashcode/訊息偏移量/時間差值/hash槽的值)
,從起始偏移量absIndexPos
開始,順序按位元組寫入。原始碼如下:
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
//計算key的hash
int keyHash = indexKeyHashMethod(key);
//計算hash槽的座標
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
try {
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
//計算時間差值
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//計算INDEX條目的起始偏移量
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//依次寫入hashcode、訊息偏移量、時間戳、hash槽的值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//將當前INDEX中包含的條目數量寫入HASH槽
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
這樣構建完 Index 索引之後,根據Message Key 或 Unique Key
查詢訊息就簡單了。
RocketMQ
的訊息是儲存到磁碟上的,這樣既能保證斷電後恢復, 又可以讓儲存的訊息量超出記憶體的限制。RocketMQ
為了提高效能,會盡可能地保證磁碟的順序寫。訊息在通過 Producer 寫入 RocketMQ 的時候,有兩種寫磁碟方式,分別為同步刷盤和非同步刷盤。
PAGECACHE
後,立刻通知刷盤執行緒刷盤,然後等待刷盤完成,刷盤執行緒執行完成後喚醒等待的執行緒,返回訊息寫成功的狀態。
PAGECACHE
,寫操作的返回快,吞吐量大;當記憶體裡的訊息量積累到一定程度時,統一觸發寫磁碟動作,快速寫入。
同步刷盤和非同步刷盤,都是通過 Broker
組態檔裡的 flushDiskType
引數設定的,把這個引數被設定成 SYNC_FLUSH
(同步)、ASYNC_FLUSH
(非同步)中的一個。
到這裡 RocketMQ 訊息儲存的幾個主要檔案 Commitlog、ConsumeQueue、Index 都一一講解完畢,然後簡略帶過刷盤機制, 如果你對訊息儲存感興趣的話最好自己拉下原始碼研究一番加深印象,又到了睡大覺的時間點了。
參考資料: