BrokerController
初始化的過程中,呼叫registerProcessor
方法註冊了處理器,在註冊處理器的程式碼中可以看到建立了處理訊息傳送的處理器物件SendMessageProcessor
,然後將其註冊到遠端服務中:
public class BrokerController {
// 初始化
public boolean initialize() throws CloneNotSupportedException {
// ...
// 註冊處理器
this.registerProcessor();
// ...
}
// 註冊處理器
public void registerProcessor() {
/**
* 傳送訊息處理器
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
// ...
// 註冊訊息傳送處理器
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
// 省略其他註冊...
}
}
在Broker收到生產者的傳送訊息請求時,會進入到SendMessageProcessor
的processRequest
方法中處理請求,然後又會呼叫asyncProcessRequest
非同步處理訊息,然後從請求中解析請求頭資料,並判斷是否是批次傳送訊息的請求,如果是批次傳送訊息呼叫asyncSendBatchMessage
方法處理,否則呼叫asyncSendMessage
方法處理單個訊息:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
// 處理請求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
// 處理請求
response = asyncProcessRequest(ctx, request).get();
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
// 非同步處理請求
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
// 解析請求頭
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
// ...
if (requestHeader.isBatch()) {
// 批次訊息傳送處理
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 單個訊息傳送處理
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
// 單個訊息傳送處理
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// ...
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 是否使用事務
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 事務處理
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 訊息持久化
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
}
以單個訊息的傳送處理方法asyncSendMessage
為例看一下訊息的接收過程:
MessageExtBrokerInner
物件,對訊息的相關內容進行封裝,將主題資訊、佇列ID、訊息內容、訊息屬性、傳送訊息時間、傳送訊息的主機地址等資訊設定到MessageExtBrokerInner中brokerController
的getMessageStore
方法獲取MessageStore
物件,然後呼叫asyncPutMessage
方法對訊息進行持久化儲存public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
// 單個訊息傳送處理
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// ...
// 建立MessageExtBrokerInner物件,之後使用這個物件來操縱訊息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
// 設定主題
msgInner.setTopic(requestHeader.getTopic());
// 設定訊息所在的佇列ID
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
// 設定訊息內容
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
// 設定屬性
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
MessageAccessor.setProperties(msgInner, origProps);
// 設定傳送訊息時間
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
// 設定傳送訊息的主機地址
msgInner.setBornHost(ctx.channel().remoteAddress());
// 設定儲存訊息的主機地址
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
// 屬性中新增叢集名稱
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
// 如果屬性中包含PROPERTY_WAIT_STORE_MSG_OK
if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
// 設定訊息屬性
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
} else {
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
}
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 是否使用事務
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 事務處理
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 訊息寫入
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
// 返回訊息持久化結果
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
}
MessageStore
是一個介面,在BrokerController
的初始化方法中可以看到,具體使用的是DefaultMessageStore
:
public class BrokerController {
private MessageStore messageStore;
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
// ...
if (result) {
try {
// 建立DefaultMessageStore
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// ...
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
// 獲取MessageStore
public MessageStore getMessageStore() {
return messageStore;
}
}
DefaultMessageStore
中有一個CommitLog
型別的成員變數,在DefaultMessageStore
中的建構函式中可以看到,如果啟用了Dleger,使用的是DLedgerCommitLog
,DLedgerCommitLog
是CommitLog
的子類,如果未啟用Dleger,就使用CommitLog
自己(接下來會以CommitLog
為例)。
在DefaultMessageStore
的asyncPutMessage
方法中,首先進行了一系列的合法性校驗,校驗通過後會呼叫CommitLog
的asyncPutMessage
進行訊息寫入:
public class DefaultMessageStore implements MessageStore {
private final CommitLog commitLog; // CommitLog
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
// 如果啟用了Dleger
if (messageStoreConfig.isEnableDLegerCommitLog()) {
// 使用DLedgerCommitLog
this.commitLog = new DLedgerCommitLog(this);
} else {
// 否則使用CommitLog
this.commitLog = new CommitLog(this);
}
// ...
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
// 校驗儲存狀態
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
// 校驗訊息合法性
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
// 進行一系列校驗
PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {
return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));
}
long beginTime = this.getSystemClock().now();
// 呼叫CommitLog的asyncPutMessage方法寫入訊息
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});
return putResultFuture;
}
}
checkStoreStatus
主要對Broker是否可以寫入訊息進行檢查,包含以下幾個方面:
MessageStore
是否已經處於關閉狀態,如果處於關閉狀態不再受理訊息的儲存 private PutMessageStatus checkStoreStatus() {
// 是否處於停止狀態
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
// 是否SLAVE角色
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("broke role is slave, so putMessage is forbidden");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
// 是否可寫
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
"the broker's disk is full, write to logic queue error, write to index file error, etc");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {
this.printTimes.set(0);
}
// 作業系統是否處於PAGECACHE繁忙狀態
if (this.isOSPageCacheBusy()) {
return PutMessageStatus.OS_PAGECACHE_BUSY;
}
return PutMessageStatus.PUT_OK;
}
checkMessage
方法主要是對主題的長度校驗和訊息屬性的長度校驗:
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
// 如果主題的長度大於最大值
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
// 如果訊息屬性長度大於最大值
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
return PutMessageStatus.PUT_OK;
}
checkLmqMessage
主要判斷在開啟LMQ(Light Message Queue)時是否超過了最大消費數量:
private PutMessageStatus checkLmqMessage(MessageExtBrokerInner msg) {
// 如果訊息屬性不為空、存在PROPERTY_INNER_MULTI_DISPATCH屬性、並且超過了最大消費數量
if (msg.getProperties() != null
&& StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
&& this.isLmqConsumeQueueNumExceeded()) {
return PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED;
}
return PutMessageStatus.PUT_OK;
}
private boolean isLmqConsumeQueueNumExceeded() {
// 開啟了LMQ && 開啟了多個佇列分發 && 消費數量大於了限定值
if (this.getMessageStoreConfig().isEnableLmq() && this.getMessageStoreConfig().isEnableMultiDispatch()
&& this.lmqConsumeQueueNum.get() > this.messageStoreConfig.getMaxLmqConsumeQueueNum()) {
return true;
}
return false;
}
對訊息進行校驗完畢之後,呼叫了CommitLog
的asyncPutMessage
進行訊息寫入,為了簡單起見,這裡我們先不考慮事務,處理流程如下:
首先對訊息的相關屬性進行了設定,主要包括以下內容
獲取當前執行緒繫結的PutMessageThreadLocal物件,裡面有一個MessageExtEncoder
型別的成員變數,呼叫它的encode方法可以對訊息進行編碼,將資料先寫入記憶體buffer,然後呼叫MessageExtBrokerInner
的setEncodedBuff
方法將buffer設定到encodedBuff
中
加鎖,從mappedFileQueue
中獲取上一次使用的對映檔案mappedFile
,並更新訊息的儲存時間, 如果mappedFile
為空或者已寫滿,說明是第一次寫入訊息還沒有建立檔案或者上一次寫入的檔案已達到規定的大小,需要新建一個檔案,如果新建檔案為空列印錯誤紀錄檔並返回結果
mappedFile可以看做是每一個Commitlog檔案的對映物件,Commitlog檔案的大小限定為1G
mappedFileQueue是所有mappedFile的集合,可以理解為CommitLog檔案所在的目錄
呼叫mappedFile
的appendMessage
方法向檔案中追加訊息資料,在呼叫方法時傳入了回撥函數appendMessageCallback,在CommitLog的建構函式中可以看到是DefaultAppendMessageCallback
型別的,所以會進入到DefaultAppendMessageCallback中進行訊息寫入,如果寫入成功,資料會留在作業系統的PAGECACHE中
呼叫submitFlushRequest
方法執行刷盤策略,判斷是否需要立刻將PAGECACHE中的資料刷到磁碟
public class CommitLog {
// 所有mappedFile集合
protected final MappedFileQueue mappedFileQueue;
// ThreadLocal
private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
// 寫入訊息的回撥函數
private final AppendMessageCallback appendMessageCallback;
public CommitLog(final DefaultMessageStore defaultMessageStore) { // 建構函式
//...
// 建立回撥函數
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
//...
}
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 設定儲存時間
msg.setStoreTimestamp(System.currentTimeMillis());
// 設定訊息的CRC值
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// 寫入結果
AppendMessageResult result = null;
// 獲取儲存統計服務
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// 獲取主題
String topic = msg.getTopic();
// 獲取事務型別
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 省略事務相關處理
}
// 獲取傳送訊息的主機地址
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) { // 如果是IPV6
msg.setBornHostV6Flag(); // 設定IPV6標識
}
// 獲取儲存訊息的主機地址
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag(); // 設定IPV6標識
}
// 獲取當前執行緒繫結的PutMessageThreadLocal物件
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
// 呼叫encode方法對訊息進行編碼,並寫入buffer
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
// 將儲存編碼訊息的buffer設定到msg中
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
// 建立PutMessageContext
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 加鎖
putMessageLock.lock();
try {
// 獲取上一次寫入的檔案
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 獲取系統時間戳
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// 再次更新儲存時間戳,保證全域性順序
msg.setStoreTimestamp(beginLockTimestamp);
// 如果mapppedFile為空或者已滿,說明是第一次寫入訊息還沒有建立檔案或者上一次寫入的檔案已滿,需要新建一個檔案
if (null == mappedFile || mappedFile.isFull()) {
// 使用偏移量0建立一個新的檔案
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
// 如果依舊為空
if (null == mappedFile) {
// 提示錯誤
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
// 寫入訊息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
// ...
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
} finally {
beginTimeInLock = 0;
putMessageLock.unlock();
}
// ...
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 統計相關
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
// 執行刷盤
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
// 返回結果
return putMessageResult;
});
}
}
MessageExtEncoder
是CommitLog
的一個內部類,它被CommitLog
的另外一個內部類PutMessageThreadLocal
所參照,ThreadLocal
一般用於多執行緒環境下,為每個執行緒建立自己的副本變數,從而互不影響,PutMessageThreadLocal在建構函式中對MessageExtEncoder進行了範例化,並指定了建立緩衝區的大小:
public class CommitLog {
// ThreadLocal
private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
// 新增訊息的ThreadLocal物件
static class PutMessageThreadLocal {
private MessageExtEncoder encoder; // 參照MessageExtEncoder
private StringBuilder keyBuilder;
PutMessageThreadLocal(int size) {
// 建立MessageExtEncoder,size用來指定分配記憶體的大小
encoder = new MessageExtEncoder(size);
keyBuilder = new StringBuilder();
}
// ...
}
}
MessageExtEncoder
中使用了ByteBuffer
作為訊息內容存放的緩衝區,上面可知緩衝區的大小是在PutMessageThreadLocal
的建構函式中指定的,MessageExtEncoder的
encode方法中對訊息進了編碼並將資料寫入分配的緩衝區:
public class CommitLog {
// MessageExtEncoder
public static class MessageExtEncoder {
// 位元組緩衝區,儲存訊息內容的buffer
private final ByteBuffer encoderBuffer;
MessageExtEncoder(final int size) {
// 分配記憶體
this.encoderBuffer = ByteBuffer.allocateDirect(size);
this.maxMessageSize = size;
}
// 對訊息進行編碼並寫入buffer
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
// 訊息屬性資料
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
// 屬性資料長度
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
// 校驗長度是否超過最大值
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
// 獲取主題資料
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;// 主題資料長度
// 獲取訊息體內容長度
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
// 總訊息內容長度
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// 是否超過最大長度限制
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 初始化
this.resetByteBuffer(encoderBuffer, msgLen);
// 1 寫入訊息長度
this.encoderBuffer.putInt(msgLen);
// 2 寫入魔數
this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 寫入訊息體CRC校驗和
this.encoderBuffer.putInt(msgInner.getBodyCRC());
// 4 寫入佇列ID
this.encoderBuffer.putInt(msgInner.getQueueId());
// 5 寫入標識
this.encoderBuffer.putInt(msgInner.getFlag());
// 6 佇列的偏移量, 稍後寫入
this.encoderBuffer.putLong(0);
// 7 檔案的物理偏移量, 稍後寫入
this.encoderBuffer.putLong(0);
// 8 寫入系統標識
this.encoderBuffer.putInt(msgInner.getSysFlag());
// 9 寫入傳送訊息的時間戳
this.encoderBuffer.putLong(msgInner.getBornTimestamp());
// 10 寫入傳送訊息的主機地址
socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
// 11 寫入儲存時間戳
this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
// 12 寫入儲存訊息的主機地址
socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
// 13 RECONSUMETIMES
this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
// 15 寫入訊息體長度
this.encoderBuffer.putInt(bodyLength);
if (bodyLength > 0)
this.encoderBuffer.put(msgInner.getBody());// 寫入訊息內容
// 16 寫入主題長度
this.encoderBuffer.put((byte) topicLength);
// 寫入主題
this.encoderBuffer.put(topicData);
// 17 寫入屬性長度
this.encoderBuffer.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.encoderBuffer.put(propertiesData); // 寫入屬性資料
encoderBuffer.flip();
return null;
}
}
}
前面提到MappedFile
可以看做是每一個Commitlog檔案的對映,裡面記錄了檔案的大小以及資料已經寫入的位置,還有兩個位元組緩衝區ByteBuffer和MappedByteBuffer,它們的繼承關係如下:
ByteBuffer:位元組緩衝區,用於在記憶體中分配空間,可以在JVM堆中分配記憶體(HeapByteBuffer),也可以在堆外分配記憶體(DirectByteBuffer)。
MappedByteBuffer:是ByteBuffer的子類,它是將磁碟的檔案內容對映到虛擬地址空間,通過虛擬地址存取實體記憶體中對映的檔案內容,也叫檔案對映,可以減少資料的拷貝。
MappedFile提供了兩種方式來進行內容的寫入,對應不同的init方法:
第一種通過ByteBuffer分配緩衝區並將內容寫入緩衝區,並且使用了池化技術對記憶體進行管理,需要時進行申請,使用完畢後回收,類似於資料庫連線池。
第二種是通過MappedByteBuffer,對CommitLog進行檔案對映,然後進行訊息寫入。
public class MappedFile extends ReferenceResource {
// 記錄檔案的寫入位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 檔案大小
protected int fileSize;
// 位元組buffer
protected ByteBuffer writeBuffer = null;
// 檔案對映
private MappedByteBuffer mappedByteBuffer;
// 池化技術,類似執行緒池,只不過池中存放的是申請的記憶體
protected TransientStorePool transientStorePool = null;
// 初始化
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
// 使用池化技術,從池中獲取一塊記憶體
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
// 初始化
private void init(final String fileName, final int fileSize) throws IOException {
// ...
try {
// 獲取檔案
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// 進行檔案對映
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
// ...
} catch (IOException e) {
// ...
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
}
經過之前的步驟,訊息內容已經寫入到記憶體緩衝區中,並且也知道準備進行寫入的CommitLog對應的對映檔案,接下來就可以呼叫MappedFile的appendMessagesInner
方法將記憶體中的內容寫入對映檔案,處理邏輯如下:
MappedFile
中記錄了檔案的寫入位置,獲取準備寫入的位置,如果寫入的位置小於檔案大小,意味著當前檔案可以進行內容寫入,反之說明此檔案已寫滿,不能繼續下一步,需要返回錯誤資訊
如果writeBuffer不為空,使用writeBuffer,否則使用mappedByteBuffer的slice
方法建立一個與MappedFile
共用的記憶體區byteBuffer,設定byteBuffer的寫入位置,之後通過byteBuffer來進行訊息寫入,由於是共用記憶體區域,所以寫入的內容會影響到writeBuffer或者mappedByteBuffer中
呼叫回撥函數的doAppend方法進行寫入,前面可知回撥函數是DefaultAppendMessageCallback
型別的
更新MappedFile
寫入位置,返回寫入結果
public class MappedFile extends ReferenceResource {
// 記錄檔案的寫入位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 檔案大小
protected int fileSize;
// 位元組buffer
protected ByteBuffer writeBuffer = null;
// 檔案對映
private MappedByteBuffer mappedByteBuffer;
// 寫入訊息
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
// 呼叫appendMessagesInner
return appendMessagesInner(msg, cb, putMessageContext);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
// 獲取寫入位置
int currentPos = this.wrotePosition.get();
// 如果寫指標小於檔案大小
if (currentPos < this.fileSize) {
// 如果writeBuffer不為空,使用writeBuffer的slice方法建立共用記憶體區,否則使用mappedByteBuffer
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 設定共用記憶體區的寫入位置
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) { // 單個訊息處理
// 通過共用記憶體區byteBuffer寫入資料
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) { // 批次訊息
// 通過共用記憶體區byteBuffer寫入資料
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新MappedFile的寫入位置
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
}
進入到DefaultAppendMessageCallback
的doAppend
方法中,首先來看方法的入參:
方法的處理邏輯如下:
計算檔案要寫入位置偏移量:檔案起始位置偏移量 + 準備寫入位置的偏移量
從訊息寫入上下文中獲取主題所屬佇列的KEY,根據KEY從主題佇列路由表中獲取佇列偏移量,如果獲取為空,將偏移量初始化為0並加入到路由表中
從msgInner中獲取之前已經寫入到記憶體的訊息資料preEncodeBuffer
,並獲取訊息內容的長度
校驗是否有足夠的空間寫入資料,如果訊息長度 + END_FILE_MIN_BLANK_LENGTH
(預留空間大小) 大於剩餘空間,說明超出了限定的檔案大小,此時只將檔案大小和魔數寫入檔案,然後返回寫入結果,結果型別為END_OF_FILE
(超過檔案大小)。
這裡可以看出每個CommitLog檔案需要預留一部分空間(8個位元組)用於儲存檔案大小和魔數。
計算佇列偏移量在preEncodeBuffer
中的位置,之前在編碼訊息步驟時並未寫入佇列的偏移量值的大小,這裡需要找到對應位置更新佇列偏移量的值
再次更新訊息的儲存時間,並將preEncodeBuffer的內容寫入檔案共用緩衝區byteBuffer,**此時訊息內容已經寫入檔案對應的記憶體buffer中,駐留在作業系統的PAGECACHE中,接下來需要根據刷盤策略決定何時將內容儲存到硬碟中。 **
訊息寫入結果
public class CommitLog {
class DefaultAppendMessageCallback implements AppendMessageCallback {
// 預留空間大小,8個位元組
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// 計算寫入位置物理偏移量:檔案起始位置 + 準備寫入位置的偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
// 獲取訊息佇列資訊
String key = putMessageContext.getTopicQueueTableKey();
// 從主題佇列路由表中獲取佇列偏移量
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
// 如果偏移量為空
if (null == queueOffset) {
queueOffset = 0L; // 初始化為0
// 新增到路由表中
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
if (!multiDispatchWrapResult) {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 如果開啟事務需要特殊處理
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
// ...
// 獲取之前已經寫入到buffer的訊息資料
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
// 獲取資料長度
final int msgLen = preEncodeBuffer.getInt(0);
// 校驗是否有足夠的空間寫入資料,如果訊息長度 + 預留空間大小 大於最大值
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
// 1 設定檔案大小
this.msgStoreItemMemory.putInt(maxBlank);
// 2 寫入魔數
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 開始時間
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 將檔案大小和魔數寫入buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
// 返回寫入結果,由於剩餘空間不足以寫入訊息內容,這裡返回型別為END_OF_FILE
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
msgIdSupplier, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// 計算佇列偏移量的位置
int pos = 4 + 4 + 4 + 4 + 4;
// 6 寫入佇列偏移量
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 7 寫入物理偏移量
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 8 系統標識, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen; // 計算儲存時間戳的寫入位置
// 更新新儲存時間戳
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 將preEncodeBuffer的資料寫入byteBuffer
byteBuffer.put(preEncodeBuffer);
// 清空buffer
msgInner.setEncodedBuff(null);
// 設定返回結果
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
break;
default:
break;
}
return result;
}
}
}
由於篇幅原因,刷盤機制將另寫一篇文章。
總結
參考
丁威、周繼鋒《RocketMQ技術內幕》
https://github.com/apache/rocketmq/blob/develop/docs/cn/Example_LMQ.md
RocketMQ版本:4.9.3