【RocketMQ】資料的清理機制

2023-10-23 12:01:48

Broker在啟動的時候會註冊定時任務,定時清理過期的資料,預設是每10s執行一次,分別清理CommitLog檔案和ConsumeQueue檔案:

public class DefaultMessageStore implements MessageStore {

    // CommitLog清理類
    private final CleanCommitLogService cleanCommitLogService;
    // ConsumeQueue清理類
    private final CleanConsumeQueueService cleanConsumeQueueService;

    public void start() throws Exception {
        // ...

        // 新增定時任務
        this.addScheduleTask();

        //...
    }

    private void addScheduleTask() {
        // ...
        // 註冊定時定理任務,預設10s執行一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                // 清理資料
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

        // ...
    }

    private void cleanFilesPeriodically() {
        // 啟動CommitLog清理
        this.cleanCommitLogService.run();
        // 啟動ConsumeQueue清理
        this.cleanConsumeQueueService.run();
    }
}

CommitLog檔案清理

CommitLog檔案清理的邏輯主要在CleanCommitLogService中,它是DefaultMessageStore的內部類,呼叫了deleteExpiredFiles方法刪除過期檔案:

public class DefaultMessageStore implements MessageStore {
    class CleanCommitLogService {
        public void run() {
            try {
                // 刪除過期檔案
                this.deleteExpiredFiles();
                this.redeleteHangedFile();
            } catch (Throwable e) {
                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    }
}

在執行清理任務之前,首先會獲取一些設定引數,然後通過isTimeToDelete方法判斷是否到了清理檔案的時間,通過isSpaceToDelete方法計算磁碟是否還有足夠的空間,處於以下三種情況之一,將會執行檔案清理操作:

  1. 已經到了清理檔案的時間,預設是4點;
  2. 磁碟使用已經超過了設定的閾值;
  3. 手動刪除;
public class DefaultMessageStore implements MessageStore {
    class CleanCommitLogService {
        private void deleteExpiredFiles() {
            int deleteCount = 0;
            // 獲取檔案保留時間
            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
            // 刪除檔案的間隔時間
            int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            // 計算是否到了清理檔案的時間,預設是4點
            boolean timeup = this.isTimeToDelete();
            // 計算磁碟是否有足夠空間
            boolean spacefull = this.isSpaceToDelete();
            // 是否手動刪除
            boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
            // 如果到了時間或者磁碟使用率超過閾值或者是手動刪除
            if (timeup || spacefull || manualDelete) {

                if (manualDelete)
                    this.manualDeleteFileSeveralTimes--;
                // 是否立刻清理
                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
                    fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);
                // fileReservedTime預設是72小時,這裡轉換為毫秒數
                fileReservedTime *= 60 * 60 * 1000;
                // 刪除超過72小時的檔案
                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                    destroyMapedFileIntervalForcibly, cleanAtOnce);
                if (deleteCount > 0) {
                } else if (spacefull) {
                    log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }
    }
}

清理時間判斷

deleteWhen:清理時間點,預設是凌晨四點:

public class MessageStoreConfig {

     // 觸發檔案刪除的時間,預設凌晨四點
    @ImportantField
    private String deleteWhen = "04";
}

isTimeToDelete中首先會獲取設定的清理時間,預設是早上4點,然後判斷當前時間是否已經到達了這個時間點:

        private boolean isTimeToDelete() {
            // 獲取設定的清理時間,預設是早上4點
            String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
            // 如果到達了時間,返回true
            if (UtilAll.isItTimeToDo(when)) {
                DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
                return true;
            }

            return false;
        }

磁碟空間使用判斷

RocketMQ設定了兩個閾值,如果磁碟使用率超過了這些閾值,需要立刻進行清理:

  • diskSpaceWarningLevelRatio:磁碟使用率警戒閾值,預設0.90;
  • diskSpaceCleanForciblyRatio:強制進行清理的磁碟使用比例閾值,預設0.85;
    class CleanCommitLogService {
        // 磁碟使用率警戒閾值,預設0.9
        private final double diskSpaceWarningLevelRatio =
            Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
        // 強制進行清理的磁碟使用比例閾值,預設0.85
        private final double diskSpaceCleanForciblyRatio =
            Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
    }

計算磁碟使用率

  1. 獲取Commitlog儲存目錄;
  2. 計算每個目錄下磁碟分割區的使用率;
  3. 記錄這些目錄中,磁碟使用率最小的那個值,記在minPhysicRatio中;

接下來對磁碟使用率最小的那個值minPhysicRatio進行判斷:

  1. 如果使用率最小的那個分割區都已經大於警告閾值(預設0.90)說明需要立刻清理,將立即執行清理狀態cleanImmediately置為true;
  2. 如果使用率最小的那個分割區大於強制進行清理的磁碟使用比例閾值(預設0.85),說明需要立刻清理,將立即執行清理狀態cleanImmediately置為true;
  3. 其他情況,表示磁碟使用率正常;
        private boolean isSpaceToDelete() {
            // 獲取磁碟最大使用比率
            double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
            cleanImmediately = false;

            {
                // 獲取Commitlog儲存目錄
                String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();
                String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
                Set<String> fullStorePath = new HashSet<>();
                double minPhysicRatio = 100;
                String minStorePath = null;
                for (String storePathPhysic : storePaths) {
                    // 計算磁碟分割區使用率
                    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
                    if (minPhysicRatio > physicRatio) {
                        minPhysicRatio =  physicRatio; // 更新最小使用率,主要是記錄所有目錄分割區中使用率最小的值
                        minStorePath = storePathPhysic;
                    }
                    // 如果大於設定的最大磁碟使用比例
                    if (physicRatio > diskSpaceCleanForciblyRatio) {
                        // 將對應的目錄加入到fullStorePath
                        fullStorePath.add(storePathPhysic);
                    }
                }
                DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
                if (minPhysicRatio > diskSpaceWarningLevelRatio) { // 如果最小的那個磁碟使用率大於警告線
                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                    if (diskok) {
                        DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
                                ", so mark disk full, storePathPhysic=" + minStorePath);
                    }
                    // 立即執行清理置為true
                    cleanImmediately = true;
                } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {// 如果最小的那個磁碟使用率大於警告線
                    cleanImmediately = true;
                } else {
                    // 其他情況,表示磁碟使用率正常
                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                    if (!diskok) {
                        DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
                                ", so mark disk ok, storePathPhysic=" + minStorePath);
                    }
                }

                // 路徑為空或者檔案不存在的時候minPhysicRatio值會變成-1,
                if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
                    DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
                            + minPhysicRatio + ", storePathPhysic=" + minStorePath);
                    return true;
                }
            }

            // ...

            // 其他情況,表示磁碟使用率正常
            return false;
        }

磁碟分割區使用率計算方式

  1. 獲取磁碟總空間大小totalSpace;
  2. 計算已使用空間大小(usedSpace) = 磁碟總空間(totalSpace) - 剩餘空間(剩餘空間不一定全部是可用的);
  3. 計算可用的剩餘空間usableSpace;
  4. 計算可用空間總大小(entireSpace) = 已使用空間大小(usedSpace) + 可用的剩餘空間(usableSpace);
  5. 計算了一個roundNum;
  6. 使用usedSpace * 100 / entireSpace + roundNum計算磁碟使用率;
public class UtilAll {
    public static double getDiskPartitionSpaceUsedPercent(final String path) {
        // 如果路徑為空,返回-1
        if (null == path || path.isEmpty()) {
            log.error("Error when measuring disk space usage, path is null or empty, path : {}", path);
            return -1;
        }

        try {
            File file = new File(path);
            // 如果檔案不存在
            if (!file.exists()) {
                log.error("Error when measuring disk space usage, file doesn't exist on this path: {}", path);
                return -1;
            }
            // 獲取總空間
            long totalSpace = file.getTotalSpace();
            if (totalSpace > 0) {
                // 已使用空間 = 磁碟總空間 - 剩餘空間(剩餘空間不一定全部是可用的)
                long usedSpace = totalSpace - file.getFreeSpace();
                // 可用剩餘空間
                long usableSpace = file.getUsableSpace();
                // 可用空間總大小
                long entireSpace = usedSpace + usableSpace;
                long roundNum = 0;
                if (usedSpace * 100 % entireSpace != 0) {
                    roundNum = 1;
                }
                long result = usedSpace * 100 / entireSpace + roundNum;
                return result / 100.0;
            }
        } catch (Exception e) {
            log.error("Error when measuring disk space usage, got exception: :", e);
            return -1;
        }
        // 異常情況返回-1
        return -1;
    }
}

清理檔案

先來看一些設定引數:

public class MessageStoreConfig {
    // 檔案的保留時間,預設72小時
    @ImportantField
    private int fileReservedTime = 72;

    // CommitLog檔案刪除間隔時間,預設100ms,刪除過期檔案後會休眠一段時間(這個間隔時間),在進行下一個
    private int deleteCommitLogFilesInterval = 100;

    // 主要用於在呼叫shuntdown關閉檔案後,如果到達這個間隔時間之後依舊有執行緒參照該檔案時,會強制將檔案的參照數置為負數,表示不再有執行緒參照
    private int destroyMapedFileIntervalForcibly = 1000 * 120;
}

回到刪除方法,繼續看到達清理條件的程式碼:

  1. 判斷是否需要立刻清理,cleanImmediately為true並且開啟了允許強制清理;
  2. fileReservedTime預設是72小時,將其轉換為毫秒數;
  3. 呼叫CommitLog的deleteExpiredFile方法,將檔案的過期時間、檔案刪除間隔、是否立刻清理等引數傳入,開始刪除過期檔案;
public class DefaultMessageStore implements MessageStore {
    class CleanCommitLogService {
        private void deleteExpiredFiles() {
            // ...
            // 如果到了時間或者磁碟使用率超過閾值或者是手動刪除
            if (timeup || spacefull || manualDelete) {
                // ...
                // 是否立刻清理
                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
                    fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);
                // fileReservedTime預設是72小時,這裡轉換為毫秒數
                fileReservedTime *= 60 * 60 * 1000;
                // 刪除超過72小時的檔案
                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                    destroyMapedFileIntervalForcibly, cleanAtOnce);
                if (deleteCount > 0) {
                } else if (spacefull) {
                    log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }
    }
}

在CommitLog的deleteExpiredFile刪除檔案的方法中,又呼叫了MappedFileQueue的deleteExpiredFileByTime開始刪除過期檔案:

public class CommitLog {
   public int deleteExpiredFile(
        final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately
    ) {
        return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
    }
}

刪除過期檔案

MappedFileQueue的deleteExpiredFileByTime處理邏輯如下:

  1. 計算檔案的存活時間:檔案最後一次修改的時間 + 過期時間(預設72小時);
  2. 如果當前時間大於檔案的存活時間,或者cleanImmediately為true,這兩個條件都表示需要清理檔案,則呼叫MappedFile的destroy方法刪除檔案;
  3. 刪除當前檔案後,如果設定了檔案刪除間隔,並且當前檔案不是最後一個,則休眠一段時間(設定的刪除間隔時間)後再刪除下一個過期檔案;
public class MappedFileQueue {
    public int deleteExpiredFileByTime(final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately) {
        Object[] mfs = this.copyMappedFiles(0);
        if (null == mfs)
            return 0;
        int mfsLength = mfs.length - 1;
        int deleteCount = 0;
        List<MappedFile> files = new ArrayList<MappedFile>();
        if (null != mfs) {
            for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                // 檔案最後一次修改的時間 + 過期時間
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                // 如果當前時間大於檔案的存活時間獲取 cleanImmediately為true
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                    // 清理檔案
                    if (mappedFile.destroy(intervalForcibly)) {
                        files.add(mappedFile);
                        deleteCount++;

                        //...
                        // 如果設定了檔案刪除間隔,並且不是最後一個檔案,休眠一段時間後再刪除下一個過期檔案
                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    //avoid deleting files in the middle
                    break;
                }
            }
        }
        deleteExpiredFile(files);
        return deleteCount;
    }
}

由於檔案可能被其他執行緒佔用,所以在刪除檔案前需要保證沒有其他執行緒使用檔案,可以看到呼叫了shutdown方法關閉檔案,然後呼叫isCleanupOver方法判斷是否沒有執行緒佔用檔案並且已經關閉檔案相關的資源,只有這兩個條件滿足時才可以刪除檔案:

public class MappedFile extends ReferenceResource {
    public boolean destroy(final long intervalForcibly) {
        // 需要先關閉檔案
        this.shutdown(intervalForcibly);
        // 是否關閉
        if (this.isCleanupOver()) {
            try {
                // 關閉filechannel
                this.fileChannel.close();
                log.info("close file channel " + this.fileName + " OK");
                long beginTime = System.currentTimeMillis();
                // 刪除檔案
                boolean result = this.file.delete();
                log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                    + this.getFlushedPosition() + ", "
                    + UtilAll.computeElapsedTimeMilliseconds(beginTime));
            } catch (Exception e) {
                log.warn("close file channel " + this.fileName + " Failed. ", e);
            }

            return true;
        } else {
            log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
                + " Failed. cleanupOver: " + this.cleanupOver);
        }

        return false;
    }
}

shutdown關閉檔案

ReferenceResource中有兩個成員變數需要關注:

  • available:表示檔案是否可用,預設為true,表示檔案可用,當呼叫shutdown方法關閉檔案的時候,會置為false;
  • refCount:當前檔案的參照數量,初始化為1,表示有一個執行緒在使用檔案,對檔案進行操作前,會先申請佔用(hold),申請佔用成功時refCount會增1表示新增一個參照,使用完畢之後會釋放(release方法),將refCount的值再減1,通過refCount的值可以判斷是否有其他執行緒在使用檔案;
  • cleanupOver:檔案的清理狀態,如果檔案已經被關閉、沒有執行緒參照並且檔案佔用的相關資源已經釋放,此時會被置為true,否則為false,表示檔案的相關資源還未清理完畢;
public abstract class ReferenceResource {
    protected final AtomicLong refCount = new AtomicLong(1);
    protected volatile boolean available = true;
    protected volatile boolean cleanupOver = false;
}

shutdown方法的處理邏輯如下:

  1. 首先判斷檔案是否可用狀態(available)是否為true,如果為true,將available為置為false,表示該檔案不再提供使用,並記錄本次關閉檔案的時間戳,然後呼叫release方法關閉檔案相關的資源;
  2. 如果available為false但是該檔案的參照數量大於0,表示檔案已經不再提供使用但是還有其他執行緒在參照,此時判斷當前時間距離上次關閉檔案的時間是否大於強制清理間隔,如果大於等於表示到了強制清理的時間,強制將檔案的參照數量置為負數,再呼叫release方法關閉檔案相關的資源;
public abstract class ReferenceResource {
   public void shutdown(final long intervalForcibly) {
        // 檔案是否可用狀態為true
        if (this.available) {
            // 置為false表示不再提供使用
            this.available = false;
            // 記錄時間
            this.firstShutdownTimestamp = System.currentTimeMillis();
            // 開始釋放資源
            this.release();
        } else if (this.getRefCount() > 0) { // 如果檔案的參照數量大於0,表示檔案被佔用
            // 計算當前時間 - 上次關閉檔案的時間,是否大於強制清理間隔
            if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
                // 參照數量設定為負值
                this.refCount.set(-1000 - this.getRefCount());
                // 釋放資源
                this.release();
            }
        }
    }
}

以commit方法為例,在使用檔案進行操作的時候,通常會先呼叫hold方法申請佔用,使用完畢之後再呼叫release方法釋放佔用:

public class MappedFile extends ReferenceResource {
    public int commit(final int commitLeastPages) {
        // ...
        if (this.isAbleToCommit(commitLeastPages)) {
            // 申請檔案的佔用
            if (this.hold()) {
                commit0();
                // 釋放檔案的佔用
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }
    }
}

在hold方法中可以看到,先判斷當前檔案是否可用(通過available的值判斷),如果可用再進行如下操作:

  1. 呼叫getAndIncrement先讓refCount自增,如果refCount大於0表示檔案佔用成功;
  2. 如果上一步refCount自增之後小於等於0,表示未成功,由於上面進行了一次自增操作,所以這裡要再減回去;
public abstract class ReferenceResource {
    public synchronized boolean hold() {
        // 判斷檔案是否可用
        if (this.isAvailable()) {
            // 呼叫getAndIncrement先讓refCount自增,如果refCount大於0表示獲取成功
            if (this.refCount.getAndIncrement() > 0) {
                return true;
            } else {
                // 如果上一步refCount自增之後小於等於0,表示未獲取成功,由於上面進行了一次自增操作,所以這裡要再減回去
                this.refCount.getAndDecrement();
            }
        }

        return false;
    }
}

在release方法中,首先可以看到對refCount進行了自減操作,釋放檔案的佔用數,然後進行如下判斷:

  1. 如果此時refCount的值依舊大於0,表示有其他執行緒還在參照,直接返回即可;
  2. 如果此時refCount的值小於等於0,表示沒有其他執行緒使用,加鎖呼叫cleanup方法開始清理檔案的使用資源,然後返回清理狀態,這一步主要是在檔案沒有其他執行緒使用的時候,關閉檔案佔用相關資源;
public abstract class ReferenceResource {

    public void release() {
        // 自減操作,釋放檔案的佔用數
        long value = this.refCount.decrementAndGet();
        // 如果檔案的參照數量大於0,表示檔案被佔用
        if (value > 0)
            return;
        // 加鎖
        synchronized (this) {
            // 清理檔案的使用資源,返回清理狀態,true表示清理完畢,false表示未完成
            this.cleanupOver = this.cleanup(value);
        }
    }
}

cleanup關閉檔案佔用資源

cleanup的處理邏輯如下:

  1. 如果檔案的可用狀態vailable還為true,表示檔案未關閉,不能清理,返回false;
  2. 如果isCleanupOver已經為true,表示已經清理完畢,返回true;
  3. 開始清理檔案佔用的資源,MappedByteBuffer涉及到堆外記憶體,所以關閉檔案前需要清理相關記憶體,防止記憶體洩露;
  4. 清理完畢,返回true賦值給cleanupOver,表示檔案相關佔用資源都已清理完畢;
public class MappedFile extends ReferenceResource {
    @Override
    public boolean cleanup(final long currentRef) {
        // 如果檔案的可用狀態還為true,表示檔案未關閉,不能清理,返回false
        if (this.isAvailable()) {
            log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have not shutdown, stop unmapping.");
            return false;
        }
        // 如果isCleanupOver已經為true,表示已經清理完畢,返回true
        if (this.isCleanupOver()) {
            log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have cleanup, do not do it again.");
            return true;
        }
        // 開始清理檔案的使用資源
        clean(this.mappedByteBuffer);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
        TOTAL_MAPPED_FILES.decrementAndGet();
        log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
        // 返回true,之後cleanupOver為true,表示已經清理了檔案的相關佔用資源
        return true;
    }
}

總結
RocketMQ會註冊定時任務,定時執行清理任務,刪除過期檔案(預設72小時),在清理任務執行時,會先判斷是否達到了清理的條件,主要有以下三個條件:

  1. 已經到了清理檔案的時間,預設是4點;
  2. 磁碟使用已經超過了設定的閾值;
  3. 手動刪除;

如果滿足以上條件之一,開始執行清理。在刪除過期檔案之前,需要保證檔案已經關閉以及沒有其他執行緒在參照該檔案,所以會呼叫關閉檔案的方法修改檔案可用狀態,將其改為不可用並清理相關資源,接著會呼叫isCleanupOver方法判斷該檔案是否沒有執行緒參照並且檔案佔用的相關資源已經釋放(MappedByteBuffer涉及到堆外記憶體,關閉檔案前需要清理相關記憶體,防止記憶體洩露),之後才可以對檔案進行刪除。