當Broker收到生產者的訊息傳送請求時,會對請求進行處理,從請求中解析傳送的訊息資料,接下來以單個訊息的接收為例,看一下訊息的接收過程。
首先Broker會建立一個MessageExtBrokerInner
物件封裝從請求中解析到的訊息資料,它會將Topic資訊、佇列ID、訊息內容、訊息屬性、傳送訊息時間、傳送訊息的主機地址等資訊設定到MessageExtBrokerInner
中,後續都使用這個MessageExtBrokerInner
物件來操縱訊息。
接下來會判斷是否開啟事務,開啟事務與未開啟事務時呼叫的方法不一致,這裡以未開啟事務為例,看下訊息的持久化過程。
在儲存訊息之前,需要對訊息進行一系列的校驗,保證收到的訊息有效合法。
主要對Broker是否可以寫入訊息進行檢查,包含以下幾個方面:
主要是對主題的長度校驗和訊息屬性的長度校驗。
主要判斷在開啟LMQ(Light Message Queue)時是否超過了最大消費數量。
對訊息進行校驗完畢之後,就可以對訊息進行寫入了。
前面說到Broker將收到的訊息封裝為了MessageExtBrokerInner
物件,這裡會新增以下設定:
(1)設定訊息儲存的時間(當前時間);
(2)計算訊息體的CRC值,並設定到對應的成員變數中;
(3)如果傳送訊息的主機地址或者當前儲存訊息的Broker地址使用了IPV6,設定相應的IPV6標識;
RocketMQ會將訊息資料先寫入記憶體buffer,寫入之前還會做一些校驗:
(1)對訊息屬性資料的長度進行校驗判斷是否超過限定值;
(2)對訊息整體內容長度進行校驗,判斷是否超過最大的長度限制;
校驗通過之後,會根據訊息總體內容的長度對buffer進行初始化,也就是根據需要的大小申請一塊記憶體區域,開始寫入以下資料:
整體儲存格式如下:
RocketMQ將每一條訊息儲存到CommitLog檔案中,儲存檔案的根目錄由設定引數storePathRootDir
決定:
預設每一個CommitLog的檔案大小為1G,如果檔案寫滿會新建一個CommitLog檔案,以該檔案中第一條訊息的偏移量為檔名,小於20位用0補齊。
比如第一個檔案中第一條訊息的偏移量為0,那麼第一個檔案的名稱為00000000000000000000,當這個檔案存滿之後,需要重新建立一個CommitLog檔案,一個檔案大小為1G,
1GB = 1024*1024*1024 = 1073741824 Bytes
,所以下一個檔案就會被命名為00000000001073741824。
在持久化訊息之前,需要知道訊息要寫入哪個CommitLog檔案,RocketMQ通過一個佇列(對應MappedFileQueue
)儲存了記錄了所有的CommitLog檔案(對應MappedFile
),並提供了相關方法獲取到當前正在使用的那個CommitLog。
mappedFileQueue是所有mappedFile的集合,可以理解為CommitLog檔案所在的那個目錄。
MappedFile可以看做是每一個Commitlog檔案的對映物件,每一個CommitLog對於一個MappedFile物件。
如果獲取到的CommitLog取為空或者已寫滿,可能是首次寫入訊息還未建立檔案或者上一次寫入的檔案已達到規定的大小(1G),此時會新建一個CommitLog檔案。
需要注意,在獲取CommitLog之前會加鎖,一是防止在多執行緒情況下建立多個CommitLog,二是接下來要往CommitLog中寫入訊息內容,防止多執行緒情況下資料錯亂。
知道要寫入哪個CommitLog之後,就可以將之前已經寫入緩衝區buffer的訊息資料寫入到CommitLog了。
RocketMQ提供了兩種方式進行寫入:
(1)通過暫存池將資料寫入緩衝區
在開啟暫存池時,會先將資料都寫入位元組緩衝區ByteBuffer
中,ByteBuffer
在申請記憶體時,可以申請JVM堆內記憶體(HeapByteBuffer
),也可以申請堆外記憶體(DirectByteBuffer
),RocketMQ使用的是堆外記憶體DirectByteBuffer
。
暫存池
類似執行緒池,只不過池中存放的是提前申請好的記憶體(ByteBuffer
),RocketMQ會預先申請一些記憶體,從原始碼中可以看到申請的是堆外記憶體,然後放入池中,需要用時從池中獲取,使用完畢後會歸還到池中。
暫存池的開啟條件
需要同時滿足以下三個條件時才會開啟暫存池:
SLAVE
;從條件3中可以看出非同步刷盤時才可以開啟暫存池的使用,因為非同步刷盤,很有可能是積攢了一批訊息,需要同時刷入磁碟,所以使用暫存池可以將之前寫入的訊息先暫存在記憶體緩衝區中,等待執行刷盤時,將積攢的訊息一起刷入磁碟中,而同步刷盤由於每次寫入完畢之後要立刻刷回磁碟,那麼就沒有必要使用暫存池快取資料了。
(2)通過檔案對映
未開啟暫存池時使用檔案對映,使用MappedByteBuffer
對映對應的CommitLog檔案,MappedByteBuffer
是ByteBuffer的子類,它可以將磁碟的檔案內容對映到虛擬地址空間,通過虛擬地址存取實體記憶體中對映的檔案內容,對檔案內容進行操作。
使用MappedByteBuffer
可以減少資料的拷貝,詳細內容可參考【Java】Java中的零拷貝。
訊息寫入流程
瞭解了寫入方式之後,來看下訊息的寫入流程:
CommitLog對應的MappedFile
物件中記錄了當前檔案的寫入位置,首先會判斷準備寫入的位置是否小於檔案總大小,如果小於意味著當前檔案可以進行內容寫入,反之說明此檔案已寫滿,不能繼續下一步,需要返回錯誤資訊;
判斷是否開啟暫存池,開啟暫存池時使用MappedFile
中的ByteBuffer
來開闢共用記憶體,否則使用MappedFile
中的;
MappedByteBuffer
來開闢。
開闢共用記憶體之後,往共用記憶體中寫入的資料,會影響到開闢它那個
ByteBuffer
或者MappedByteBuffer
中;
將之前已經寫入緩衝區的訊息資料寫入到開闢的共用記憶體中;
返回訊息寫入結果,有以下幾種狀態:
需要注意,此時訊息駐留在作業系統的PAGECACHE中,接下來需要根據刷盤策略決定何時將內容刷入到硬碟中。
RocketMQ訊息儲存相關原始碼可參考:【RocketMQ】【原始碼】訊息的儲存
在以上的訊息寫入步驟完成之後,會進行刷盤操作。
有兩種刷盤策略:
同步刷盤:表示訊息寫入到記憶體之後需要立刻刷到磁碟檔案中。
非同步刷盤:表示訊息寫入記憶體成功之後就返回,由MQ定時將資料刷入到磁碟中,會有一定的資料丟失風險。
不管同步刷盤還是非同步刷盤,都是喚醒對應的刷盤執行緒來進行,這裡不對喚醒的具體過程進行講解,如果想了解可參考【RocketMQ】【原始碼】訊息的刷盤機制。
前面講到,暫存池只有在非同步刷盤時才可以啟用,所以設定為同步刷盤時,使用的是檔案對映的方式寫入資料,在同步刷盤時直接通過MappedByteBuffer
的force
方法將資料flush到磁碟檔案即可。
非同步刷盤有開啟暫存池和未開啟兩種情況。
開啟暫存池時,可以分為Commit和Flush兩個階段。
(1)Commit階段
在開啟暫存池時,資料會先寫入緩衝區ByteBuffer
中,並未對映到CommitLog檔案中,所以首先會喚醒Commit執行緒,將ByteBuffer
中的資料寫入到CommitLog對應的FileChannel
中。
(2)Flush階段
資料被寫入FileChannel
之後,就會喚醒Flush執行緒,再呼叫FileChannel
的force方法將資料flush到磁碟。
未開啟暫存池時使用檔案對映的方式,直接喚醒Flush執行緒,呼叫MappedByteBuffer
的force
方法將資料flush到磁碟檔案即可。
通過上面分析訊息的持久化過程,來看下RocketMQ提升效能的一些地方。
(1)RocketMQ在寫入資料到CommitLog時,採用的是順序寫的方式,順序寫比隨機寫檔案效率要高很多。
(2)在非同步刷盤時,可以使用暫存池,暫存池會提前申請好記憶體,申請記憶體是一個比較重的操作,所以避免在訊息寫入時申請記憶體,以此提高效率。
(3)RocketMQ使用了MappedByteBuffer
檔案對映的方式,向CommitLog寫入資料,可以減少資料的拷貝過程。
參考