Broker在啟動的時候會註冊定時任務,定時清理過期的資料,預設是每10s執行一次,分別清理CommitLog檔案和ConsumeQueue檔案:
public class DefaultMessageStore implements MessageStore {
// CommitLog清理類
private final CleanCommitLogService cleanCommitLogService;
// ConsumeQueue清理類
private final CleanConsumeQueueService cleanConsumeQueueService;
public void start() throws Exception {
// ...
// 新增定時任務
this.addScheduleTask();
//...
}
private void addScheduleTask() {
// ...
// 註冊定時定理任務,預設10s執行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 清理資料
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
// ...
}
private void cleanFilesPeriodically() {
// 啟動CommitLog清理
this.cleanCommitLogService.run();
// 啟動ConsumeQueue清理
this.cleanConsumeQueueService.run();
}
}
CommitLog檔案清理的邏輯主要在CleanCommitLogService中,它是DefaultMessageStore的內部類,呼叫了deleteExpiredFiles方法刪除過期檔案:
public class DefaultMessageStore implements MessageStore {
class CleanCommitLogService {
public void run() {
try {
// 刪除過期檔案
this.deleteExpiredFiles();
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
}
在執行清理任務之前,首先會獲取一些設定引數,然後通過isTimeToDelete方法判斷是否到了清理檔案的時間,通過isSpaceToDelete方法計算磁碟是否還有足夠的空間,處於以下三種情況之一,將會執行檔案清理操作:
public class DefaultMessageStore implements MessageStore {
class CleanCommitLogService {
private void deleteExpiredFiles() {
int deleteCount = 0;
// 獲取檔案保留時間
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 刪除檔案的間隔時間
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 計算是否到了清理檔案的時間,預設是4點
boolean timeup = this.isTimeToDelete();
// 計算磁碟是否有足夠空間
boolean spacefull = this.isSpaceToDelete();
// 是否手動刪除
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
// 如果到了時間或者磁碟使用率超過閾值或者是手動刪除
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
// 是否立刻清理
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);
// fileReservedTime預設是72小時,這裡轉換為毫秒數
fileReservedTime *= 60 * 60 * 1000;
// 刪除超過72小時的檔案
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
}
}
deleteWhen:清理時間點,預設是凌晨四點:
public class MessageStoreConfig {
// 觸發檔案刪除的時間,預設凌晨四點
@ImportantField
private String deleteWhen = "04";
}
isTimeToDelete中首先會獲取設定的清理時間,預設是早上4點,然後判斷當前時間是否已經到達了這個時間點:
private boolean isTimeToDelete() {
// 獲取設定的清理時間,預設是早上4點
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
// 如果到達了時間,返回true
if (UtilAll.isItTimeToDo(when)) {
DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
return true;
}
return false;
}
RocketMQ設定了兩個閾值,如果磁碟使用率超過了這些閾值,需要立刻進行清理:
class CleanCommitLogService {
// 磁碟使用率警戒閾值,預設0.9
private final double diskSpaceWarningLevelRatio =
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
// 強制進行清理的磁碟使用比例閾值,預設0.85
private final double diskSpaceCleanForciblyRatio =
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
}
計算磁碟使用率
接下來對磁碟使用率最小的那個值minPhysicRatio進行判斷:
private boolean isSpaceToDelete() {
// 獲取磁碟最大使用比率
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
cleanImmediately = false;
{
// 獲取Commitlog儲存目錄
String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();
String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
Set<String> fullStorePath = new HashSet<>();
double minPhysicRatio = 100;
String minStorePath = null;
for (String storePathPhysic : storePaths) {
// 計算磁碟分割區使用率
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio; // 更新最小使用率,主要是記錄所有目錄分割區中使用率最小的值
minStorePath = storePathPhysic;
}
// 如果大於設定的最大磁碟使用比例
if (physicRatio > diskSpaceCleanForciblyRatio) {
// 將對應的目錄加入到fullStorePath
fullStorePath.add(storePathPhysic);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
if (minPhysicRatio > diskSpaceWarningLevelRatio) { // 如果最小的那個磁碟使用率大於警告線
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
}
// 立即執行清理置為true
cleanImmediately = true;
} else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {// 如果最小的那個磁碟使用率大於警告線
cleanImmediately = true;
} else {
// 其他情況,表示磁碟使用率正常
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
", so mark disk ok, storePathPhysic=" + minStorePath);
}
}
// 路徑為空或者檔案不存在的時候minPhysicRatio值會變成-1,
if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
+ minPhysicRatio + ", storePathPhysic=" + minStorePath);
return true;
}
}
// ...
// 其他情況,表示磁碟使用率正常
return false;
}
磁碟分割區使用率計算方式
usedSpace * 100 / entireSpace + roundNum
計算磁碟使用率;public class UtilAll {
public static double getDiskPartitionSpaceUsedPercent(final String path) {
// 如果路徑為空,返回-1
if (null == path || path.isEmpty()) {
log.error("Error when measuring disk space usage, path is null or empty, path : {}", path);
return -1;
}
try {
File file = new File(path);
// 如果檔案不存在
if (!file.exists()) {
log.error("Error when measuring disk space usage, file doesn't exist on this path: {}", path);
return -1;
}
// 獲取總空間
long totalSpace = file.getTotalSpace();
if (totalSpace > 0) {
// 已使用空間 = 磁碟總空間 - 剩餘空間(剩餘空間不一定全部是可用的)
long usedSpace = totalSpace - file.getFreeSpace();
// 可用剩餘空間
long usableSpace = file.getUsableSpace();
// 可用空間總大小
long entireSpace = usedSpace + usableSpace;
long roundNum = 0;
if (usedSpace * 100 % entireSpace != 0) {
roundNum = 1;
}
long result = usedSpace * 100 / entireSpace + roundNum;
return result / 100.0;
}
} catch (Exception e) {
log.error("Error when measuring disk space usage, got exception: :", e);
return -1;
}
// 異常情況返回-1
return -1;
}
}
先來看一些設定引數:
public class MessageStoreConfig {
// 檔案的保留時間,預設72小時
@ImportantField
private int fileReservedTime = 72;
// CommitLog檔案刪除間隔時間,預設100ms,刪除過期檔案後會休眠一段時間(這個間隔時間),在進行下一個
private int deleteCommitLogFilesInterval = 100;
// 主要用於在呼叫shuntdown關閉檔案後,如果到達這個間隔時間之後依舊有執行緒參照該檔案時,會強制將檔案的參照數置為負數,表示不再有執行緒參照
private int destroyMapedFileIntervalForcibly = 1000 * 120;
}
回到刪除方法,繼續看到達清理條件的程式碼:
public class DefaultMessageStore implements MessageStore {
class CleanCommitLogService {
private void deleteExpiredFiles() {
// ...
// 如果到了時間或者磁碟使用率超過閾值或者是手動刪除
if (timeup || spacefull || manualDelete) {
// ...
// 是否立刻清理
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);
// fileReservedTime預設是72小時,這裡轉換為毫秒數
fileReservedTime *= 60 * 60 * 1000;
// 刪除超過72小時的檔案
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
}
}
在CommitLog的deleteExpiredFile刪除檔案的方法中,又呼叫了MappedFileQueue的deleteExpiredFileByTime開始刪除過期檔案:
public class CommitLog {
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
}
MappedFileQueue的deleteExpiredFileByTime處理邏輯如下:
public class MappedFileQueue {
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// 檔案最後一次修改的時間 + 過期時間
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 如果當前時間大於檔案的存活時間獲取 cleanImmediately為true
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
// 清理檔案
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
//...
// 如果設定了檔案刪除間隔,並且不是最後一個檔案,休眠一段時間後再刪除下一個過期檔案
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
deleteExpiredFile(files);
return deleteCount;
}
}
由於檔案可能被其他執行緒佔用,所以在刪除檔案前需要保證沒有其他執行緒使用檔案,可以看到呼叫了shutdown方法關閉檔案,然後呼叫isCleanupOver方法判斷是否沒有執行緒佔用檔案並且已經關閉檔案相關的資源,只有這兩個條件滿足時才可以刪除檔案:
public class MappedFile extends ReferenceResource {
public boolean destroy(final long intervalForcibly) {
// 需要先關閉檔案
this.shutdown(intervalForcibly);
// 是否關閉
if (this.isCleanupOver()) {
try {
// 關閉filechannel
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
// 刪除檔案
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
}
ReferenceResource中有兩個成員變數需要關注:
public abstract class ReferenceResource {
protected final AtomicLong refCount = new AtomicLong(1);
protected volatile boolean available = true;
protected volatile boolean cleanupOver = false;
}
shutdown方法的處理邏輯如下:
public abstract class ReferenceResource {
public void shutdown(final long intervalForcibly) {
// 檔案是否可用狀態為true
if (this.available) {
// 置為false表示不再提供使用
this.available = false;
// 記錄時間
this.firstShutdownTimestamp = System.currentTimeMillis();
// 開始釋放資源
this.release();
} else if (this.getRefCount() > 0) { // 如果檔案的參照數量大於0,表示檔案被佔用
// 計算當前時間 - 上次關閉檔案的時間,是否大於強制清理間隔
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
// 參照數量設定為負值
this.refCount.set(-1000 - this.getRefCount());
// 釋放資源
this.release();
}
}
}
}
以commit方法為例,在使用檔案進行操作的時候,通常會先呼叫hold方法申請佔用,使用完畢之後再呼叫release方法釋放佔用:
public class MappedFile extends ReferenceResource {
public int commit(final int commitLeastPages) {
// ...
if (this.isAbleToCommit(commitLeastPages)) {
// 申請檔案的佔用
if (this.hold()) {
commit0();
// 釋放檔案的佔用
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
}
}
在hold方法中可以看到,先判斷當前檔案是否可用(通過available的值判斷),如果可用再進行如下操作:
public abstract class ReferenceResource {
public synchronized boolean hold() {
// 判斷檔案是否可用
if (this.isAvailable()) {
// 呼叫getAndIncrement先讓refCount自增,如果refCount大於0表示獲取成功
if (this.refCount.getAndIncrement() > 0) {
return true;
} else {
// 如果上一步refCount自增之後小於等於0,表示未獲取成功,由於上面進行了一次自增操作,所以這裡要再減回去
this.refCount.getAndDecrement();
}
}
return false;
}
}
在release方法中,首先可以看到對refCount進行了自減操作,釋放檔案的佔用數,然後進行如下判斷:
public abstract class ReferenceResource {
public void release() {
// 自減操作,釋放檔案的佔用數
long value = this.refCount.decrementAndGet();
// 如果檔案的參照數量大於0,表示檔案被佔用
if (value > 0)
return;
// 加鎖
synchronized (this) {
// 清理檔案的使用資源,返回清理狀態,true表示清理完畢,false表示未完成
this.cleanupOver = this.cleanup(value);
}
}
}
cleanup的處理邏輯如下:
public class MappedFile extends ReferenceResource {
@Override
public boolean cleanup(final long currentRef) {
// 如果檔案的可用狀態還為true,表示檔案未關閉,不能清理,返回false
if (this.isAvailable()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have not shutdown, stop unmapping.");
return false;
}
// 如果isCleanupOver已經為true,表示已經清理完畢,返回true
if (this.isCleanupOver()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have cleanup, do not do it again.");
return true;
}
// 開始清理檔案的使用資源
clean(this.mappedByteBuffer);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
TOTAL_MAPPED_FILES.decrementAndGet();
log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
// 返回true,之後cleanupOver為true,表示已經清理了檔案的相關佔用資源
return true;
}
}
總結
RocketMQ會註冊定時任務,定時執行清理任務,刪除過期檔案(預設72小時),在清理任務執行時,會先判斷是否達到了清理的條件,主要有以下三個條件:
如果滿足以上條件之一,開始執行清理。在刪除過期檔案之前,需要保證檔案已經關閉以及沒有其他執行緒在參照該檔案,所以會呼叫關閉檔案的方法修改檔案可用狀態,將其改為不可用並清理相關資源,接著會呼叫isCleanupOver方法判斷該檔案是否沒有執行緒參照並且檔案佔用的相關資源已經釋放(MappedByteBuffer涉及到堆外記憶體,關閉檔案前需要清理相關記憶體,防止記憶體洩露),之後才可以對檔案進行刪除。