RocketMQ 系列(四) 訊息儲存

2023-09-11 12:01:11

RocketMQ 系列(四) 訊息儲存

本文是 RocketMQ 系列的第四篇,下面是前面幾篇的文章,不清楚的話點選看一下吧。

RocketMQ 作為一款優秀的分散式訊息中介軟體,可以為業務方提供高效能低延遲的穩定可靠的訊息服務。其核心優勢是可靠的消費儲存、訊息傳送的高效能和低延遲、強大的訊息堆積能力和訊息處理能力。

從儲存方式來看,主要有幾個方面:

  • 檔案系統
  • 分散式KV儲存
  • 關係型資料庫

從效率上來講,檔案系統高於KV儲存,KV儲存又高於關係型資料庫。因為直接操作檔案系統肯定是最快的,那麼業界主流的訊息佇列中介軟體,如RocketMQ 、RabbitMQ 、kafka 都是採用檔案系統的方式來儲存訊息。

這篇文章我們來探索一下 RocketMQ 訊息儲存的機制。

1、整體架構

這裡我們直接參照官方的流程圖,如下:

2.3、Commitlog 解析

2.3.1、讀取檔案內容

由於 commitlog 是一個二進位制檔案,我們沒有辦法直接讀取其內容,所以需要通過程式讀取它的位元組陣列:

public static ByteBuffer read(String path)throws Exception{
    File file = new File(path);
    FileInputStream fin = new FileInputStream(file);
    byte[] bytes = new byte[(int)file.length()];
    fin.read(bytes);
    ByteBuffer buffer = ByteBuffer.wrap(bytes);
    return buffer;
}

如上程式碼,可以通過傳入檔案的路徑,讀取該檔案所有的內容。為了方便下一步操作,我們把讀取到的位元組陣列轉換為java.nio.ByteBuffer物件。

2.3.2、解析

在解析之前,我們需要弄明白兩件事:

  • Message 的格式,即一條訊息包含哪些欄位;
  • 每個欄位所佔的位元組大小。

在上面的圖中,我們已經看到了訊息的格式,包含了19個欄位。關於位元組大小,有的是 4 位元組,有的是 8 位元組,我們不再一一贅述,直接看程式碼。

/**
     * commitlog 檔案解析
     * @param byteBuffer
     * @return
     * @throws Exception
     */
public static MessageExt decodeCommitLog(ByteBuffer byteBuffer)throws Exception {

    MessageExt msgExt = new MessageExt();

    // 1 TOTALSIZE
    int storeSize = byteBuffer.getInt();
    msgExt.setStoreSize(storeSize);

    if (storeSize<=0){
        return null;
    }

    // 2 MAGICCODE
    byteBuffer.getInt();

    // 3 BODYCRC
    int bodyCRC = byteBuffer.getInt();
    msgExt.setBodyCRC(bodyCRC);

    // 4 QUEUEID
    int queueId = byteBuffer.getInt();
    msgExt.setQueueId(queueId);

    // 5 FLAG
    int flag = byteBuffer.getInt();
    msgExt.setFlag(flag);

    // 6 QUEUEOFFSET
    long queueOffset = byteBuffer.getLong();
    msgExt.setQueueOffset(queueOffset);

    // 7 PHYSICALOFFSET
    long physicOffset = byteBuffer.getLong();
    msgExt.setCommitLogOffset(physicOffset);

    // 8 SYSFLAG
    int sysFlag = byteBuffer.getInt();
    msgExt.setSysFlag(sysFlag);

    // 9 BORNTIMESTAMP
    long bornTimeStamp = byteBuffer.getLong();
    msgExt.setBornTimestamp(bornTimeStamp);

    // 10 BORNHOST
    int bornhostIPLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 : 16;
    byte[] bornHost = new byte[bornhostIPLength];
    byteBuffer.get(bornHost, 0, bornhostIPLength);
    int port = byteBuffer.getInt();
    msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));

    // 11 STORETIMESTAMP
    long storeTimestamp = byteBuffer.getLong();
    msgExt.setStoreTimestamp(storeTimestamp);

    // 12 STOREHOST
    int storehostIPLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;
    byte[] storeHost = new byte[storehostIPLength];
    byteBuffer.get(storeHost, 0, storehostIPLength);
    port = byteBuffer.getInt();
    msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));

    // 13 RECONSUMETIMES
    int reconsumeTimes = byteBuffer.getInt();
    msgExt.setReconsumeTimes(reconsumeTimes);

    // 14 Prepared Transaction Offset
    long preparedTransactionOffset = byteBuffer.getLong();
    msgExt.setPreparedTransactionOffset(preparedTransactionOffset);

    // 15 BODY
    int bodyLen = byteBuffer.getInt();
    if (bodyLen > 0) {
        byte[] body = new byte[bodyLen];
        byteBuffer.get(body);
        msgExt.setBody(body);
    }

    // 16 TOPIC
    byte topicLen = byteBuffer.get();
    byte[] topic = new byte[(int) topicLen];
    byteBuffer.get(topic);
    msgExt.setTopic(new String(topic, CHARSET_UTF8));

    // 17 properties
    short propertiesLength = byteBuffer.getShort();
    if (propertiesLength > 0) {
        byte[] properties = new byte[propertiesLength];
        byteBuffer.get(properties);
        String propertiesString = new String(properties, CHARSET_UTF8);
        Map<String, String> map = string2messageProperties(propertiesString);
    }
    int msgIDLength = storehostIPLength + 4 + 8;
    ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
    String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
    msgExt.setMsgId(msgId);

    return msgExt;
}

解析方法寫完之後,開始輸出 commitlog 內容。

2.3.3、輸出 commitlog 內容

這裡將 Linux 伺服器上的 commitlog 檔案拷貝到本地進行測試:

public static void main(String[] args) throws Exception {
    String filePath = "C:\\Users\\Administrator\\Desktop\\00000000000000000000";
    ByteBuffer buffer = read(filePath);
    List<MessageExt> messageList = new ArrayList<>();
    while (true){
        MessageExt message = decodeCommitLog(buffer);
        if (message==null){
            break;
        }
        messageList.add(message);
    }
    for (MessageExt ms:messageList) {
        System.out.println("主題:"+ms.getTopic()+" 訊息:"+
                           new String(ms.getBody())+" 佇列ID:"+ms.getQueueId()+" 儲存地址:"+ms.getStoreHost());
    }
}

控制檯成功輸出解析後的內容:

主題:topicClean 訊息:syncMessage 佇列ID:0 儲存地址:/192.168.0.17:10911

3、ConsumeQueue

在瞭解 ConsumeQueue之前,有必要先了解 MessageQueue 的概念。

3.1、MessageQueue

我們知道,在傳送訊息的時候,要指定一個 Topic。那麼,在建立 Topic 的時候,有一個很重要的引數MessageQueue。簡單來說,就是你這個Topic對應了多少個佇列,也就是幾個MessageQueue,預設是4個。那它的作用是什麼呢 ?

假設我們的 Topic 裡面有 100 條資料,該 Topic 預設是 4 個MessageQueue,那麼每個MessageQueue中大約 25 條資料。 然後,這些MessageQueue是和Broker繫結在一起的,就是說每個MessageQueue都可能處於不同的Broker機器上,這取決於你的佇列數量和Broker叢集。

通過 RocketMQ 控制檯可以看到 Topic 下的 MessageQueue情況:

能夠看到 Topic 為 topicClean 的主題下,一共有四個 MessageQueue, 由於我的 Broker 只有一個,所以 brokerName 都是一樣的。

既然MessageQueue是多個,那麼在訊息傳送的時候,勢必要通過某種方式選擇一個佇列。預設的情況下,就是通過輪詢來獲取一個訊息佇列。

檢視 RocketMQ 原始碼,找到方法如下:

public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.incrementAndGet();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

3.2、ConsumeQueue 結構

介紹完了 MessageQueue,那麼它跟 ConsumeQueue 有什麼關係呢?而 ConsumeQueue 又是什麼?

與 MessageQueue關係

ConsumeQueue是一組組檔案,而一個 MessageQueue 對應其中一組 ConsumeQueue 檔案,主要的作用是記錄當前 MessageQueue 被哪些消費者組消費到了 Commitlog 中哪一條訊息。

ConsumeQueue 目錄下面是以 Topic 命名的資料夾,然後再下一級是以MessageQueue佇列ID命名的資料夾,最後才是一個或多個 ConsumeQueue檔案:

4.1、Index 結構

Index 檔案結構如下所示:

從圖中可以看出,Index 檔案分為如下 3 部分,IndexHead,Hash 槽,Index 條目。

IndexHead 的格式如下

欄位 解釋
beginTimestamp 訊息的最小儲存時間
endTimestamp 訊息的最大儲存時間
beginPhyOffset 訊息的最小偏移量(commitLog 檔案中的偏移量)
endPhyOffset 訊息的最大偏移量(commitLog 檔案中的偏移量)
hashSlotCount hash 槽個數
indexCount index 條目當前已使用的個數

Hash 槽儲存的內容為落在該 Hash 槽內的 Index 的索引(看後面圖示你就會很清楚了)

每個 Index 條目的格式如下

欄位 解釋
hashcode key 的hashcode
phyoffset 訊息的偏移量(commitLog檔案中的偏移量)
timedif 該訊息儲存時間與第一條訊息的時間戳的差值,小於0該訊息無效
pre index no 該條目的前一條記錄的Index索引,當hash衝突時,用來構建連結串列

key 的組成有如下兩種形式

  1. Topic#Unique Key
  2. Topic#Message Key

Unique Key 是在 Producer 端傳送訊息生成的

// DefaultMQProducerImpl#sendKernelImpl
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

Message Key 是我們在傳送訊息的時候設定的,通常具有業務意義,方便我們快速查詢訊息

/**
     * 傳送 message key 訊息
     */
@RequestMapping("/containKeySend")
public void containKeySend() {
    //指定Topic與Tag,格式: `topicName:tags`
    SendResult sendResult = rocketMQTemplate.syncSend("topicClean:tagTest",
        MessageBuilder
            .withPayload("this is message")
            // key:可通過key查詢訊息軌跡,如訊息被誰消費,定位訊息丟失問題。由於是雜湊索引,須保證key儘可能唯一
            .setHeader(MessageConst.PROPERTY_KEYS, "123456").build());
    System.out.println("傳送延時訊息:" + sendResult.toString());
}

Index 檔案構成過程比較麻煩,你可以把 Index檔案 想成基於檔案實現的 HashMap

假如說往陣列長度為 10 的HashMap依次放入3個 key 為11,34,21 的資料(以尾插法演示了),HashMap 的結構如下:

將key為11,34,21的資料放到IndexFile中的過程如下(假如hash槽的數量為10

具體的過程為

  1. 將訊息順序放到 Index 條目中,將 11 放到 index=1 的位置(用index[1]表示),11%1=1,算出 hash 槽的位置是 1,存的值是 0(剛開始都是0,用hash[0]表示),將 index[1].preIndexNo=hash[0]=0,hash[0]=1(1 為 index 陣列下標)
  2. 將 34 放到 index[2],34%10=4,index[2].preIndexNo=hash[0]=0
  3. 將 21 放到 index[3],21%10=1,index[3].preIndexNo=hash[1]=1

從圖中可以看出來,當發生 hash 衝突時 Index 條目的 preIndexNo 屬性充當了連結串列的作用。查詢的過程和 HashMap 基本類似,先定位到槽的位置,然後順著連結串列找就行了。

4.2、解析

為了便於理解,我們還是以程式碼的方式,來解析這個檔案。

public static void main(String[] args) throws Exception {
    //index索引檔案的路徑
    String path = "D:\\software\\IdeaProjects\\cloud\\rockemq-producer\\src\\file\\index\\20230823171341863";
    ByteBuffer buffer = DecodeCommitLogTools.read(path);
    //該索引檔案中包含訊息的最小儲存時間
    long beginTimestamp = buffer.getLong();
    //該索引檔案中包含訊息的最大儲存時間
    long endTimestamp = buffer.getLong();
    //該索引檔案中包含訊息的最大物理偏移量(commitlog檔案偏移量)
    long beginPhyOffset = buffer.getLong();
    //該索引檔案中包含訊息的最大物理偏移量(commitlog檔案偏移量)
    long endPhyOffset = buffer.getLong();
    //hashslot個數
    int hashSlotCount = buffer.getInt();
    //Index條目列表當前已使用的個數
    int indexCount = buffer.getInt();

    //500萬個hash槽,每個槽佔4個位元組,儲存的是index索引
    for (int i=0;i<5000000;i++){
        buffer.getInt();
    }
    //2000萬個index條目
    for (int j=0;j<20000000;j++){
        //訊息key的hashcode
        int hashcode = buffer.getInt();
        //訊息對應的偏移量
        long offset = buffer.getLong();
        //訊息儲存時間和第一條訊息的差值
        int timedif = buffer.getInt();
        //該條目的上一條記錄的index索引
        int pre_no = buffer.getInt();
    }
    System.out.println(buffer.position()==buffer.capacity());
}

我們看最後輸出的結果為 true,則證明解析的過程無誤。

4.3、構建索引

我們傳送的訊息體中,包含Message Key 或 Unique Key,那麼就會給它們每一個都構建索引。

這裡重點有兩個:

  • 根據訊息 Key 計算 Hash 槽的位置;
  • 根據 Hash 槽的數量和 Index 索引來計算 Index 條目的起始位置。

將當前 Index條目 的索引值,寫在Hash槽absSlotPos位置上;將Index條目的具體資訊(hashcode/訊息偏移量/時間差值/hash槽的值),從起始偏移量absIndexPos開始,順序按位元組寫入。原始碼如下:

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        //計算key的hash
        int keyHash = indexKeyHashMethod(key);
        //計算hash槽的座標
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        try {

            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            //計算時間差值
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            //計算INDEX條目的起始偏移量
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                + this.indexHeader.getIndexCount() * indexSize;

            //依次寫入hashcode、訊息偏移量、時間戳、hash槽的值
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

            //將當前INDEX中包含的條目數量寫入HASH槽
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                 + "; index max num = " + this.indexNum);
    }

    return false;
}

這樣構建完 Index 索引之後,根據Message Key 或 Unique Key查詢訊息就簡單了。

5、刷盤機制

RocketMQ 的訊息是儲存到磁碟上的,這樣既能保證斷電後恢復, 又可以讓儲存的訊息量超出記憶體的限制。RocketMQ 為了提高效能,會盡可能地保證磁碟的順序寫。訊息在通過 Producer 寫入 RocketMQ 的時候,有兩種寫磁碟方式,分別為同步刷盤和非同步刷盤。

e29a44ed868e4978a8bb3db3a7be0eca.png
  1. 同步刷盤:在返回寫成功狀態時,訊息已經被寫入磁碟。具體流程是,訊息寫入記憶體的 PAGECACHE 後,立刻通知刷盤執行緒刷盤,然後等待刷盤完成,刷盤執行緒執行完成後喚醒等待的執行緒,返回訊息寫成功的狀態。
    • 優點:效能高。
    • 缺點:Master 宕機,磁碟損壞的情況下,會丟失少量的訊息, 導致MQ的訊息狀態和生產者/消費者的訊息狀態不一致。
  2. 非同步刷盤:在返回寫成功狀態時,訊息可能只是被寫入了記憶體的 PAGECACHE,寫操作的返回快,吞吐量大;當記憶體裡的訊息量積累到一定程度時,統一觸發寫磁碟動作,快速寫入。
    • 優點:可以保持MQ的訊息狀態和生產者/消費者的訊息狀態一致
    • 缺點:效能比非同步的低

同步刷盤和非同步刷盤,都是通過 Broker 組態檔裡的 flushDiskType 引數設定的,把這個引數被設定成 SYNC_FLUSH(同步)、ASYNC_FLUSH (非同步)中的一個。

到這裡 RocketMQ 訊息儲存的幾個主要檔案 Commitlog、ConsumeQueue、Index 都一一講解完畢,然後簡略帶過刷盤機制, 如果你對訊息儲存感興趣的話最好自己拉下原始碼研究一番加深印象,又到了睡大覺的時間點了。

參考資料: