flowable非同步任務加鎖流程

2023-03-06 15:00:47

一、非同步任務執行

  1.1流程圖如下:

1.2時序圖如下:

加入有兩個非同步任務,同時觸達,那麼如下圖

1.3程式碼分析如下:

1.3.1 入口程式碼

附上部分原始碼。事物提交監聽器入口如下:
public class JobAddedTransactionListener implements TransactionListener {

    ...

    @Override
    public void execute(CommandContext commandContext) {
        asyncExecutor.executeAsyncJob(job);
    }

 

接入spring後,後續主要邏輯進入ExecuteAsyncRunnable類的run方法,如下:
public void run() {

    ...
    if (job instanceof AbstractRuntimeJobEntity) {

        boolean lockingNeeded = ((AbstractRuntimeJobEntity) job).isExclusive();
        boolean executeJob = true;
        if (lockingNeeded) {
            executeJob = lockJob();
        }
        if (executeJob) {
            executeJob(lockingNeeded);
        }

    }

}

 

1.3.2 加鎖程式碼

lockJob方法,主要程式碼如下:
protected boolean lockJob() {
    Job job = (Job) this.job; 
    try {
        // 加鎖
        jobServiceConfiguration.getCommandExecutor().execute(new LockExclusiveJobCmd(job, jobServiceConfiguration));

    } catch (Throwable lockException) {

        // 釋放job,等待下次被呼叫
        unacquireJob();

        return false;
    }

    return true;
}

 

真實加鎖方法,主要程式碼在DefaultInternalJobManager類,如下:
protected void lockJobScopeInternal(Job job) {
    ExecutionEntityManager executionEntityManager = getExecutionEntityManager();
    ExecutionEntity execution = executionEntityManager.findById(job.getExecutionId());
    if (execution != null) {
        String lockOwner;
        Date lockExpirationTime;
        // 處理lockOwner與lockExpirationTime,省略
        
        executionEntityManager.updateProcessInstanceLockTime(execution.getProcessInstanceId(), lockOwner, lockExpirationTime);
    }

}

 

實際加鎖,其實是資料庫悲觀鎖,在MybatisExecutionDataManager類如下:
public void updateProcessInstanceLockTime(String processInstanceId, Date lockDate, String lockOwner, Date expirationTime) {
    HashMap<String, Object> params = new HashMap<>();
    params.put("id", processInstanceId);
    params.put("lockTime", lockDate);
    params.put("expirationTime", expirationTime);
    params.put("lockOwner", lockOwner);

    int result = getDbSqlSession().directUpdate("updateProcessInstanceLockTime", params);
    if (result == 0) {
        throw new FlowableOptimisticLockingException("Could not lock process instance");
    }
}

二、非同步job執行

  2.1 流程圖如下:

  • 注意,此圖中的鎖衝突,主要是多伺服器並行撈取資料時,容易觸發。

2.2 程式碼如下

2.2.1 入口

入口在AcquireAsyncJobsDueRunnable的run方法,主要程式碼如下:
public synchronized void run() {
    while (!isInterrupted) {

        // 全域性鎖——增加之後,所有範例中,鎖過期前同一時間只有一個可以執行
        if (configuration.isGlobalAcquireLockEnabled()) {

        
        } else {
            // 迴圈執行
            millisToWait = executeAcquireCycle(commandExecutor);

        }

        // 等待
        if (millisToWait > 0) {
            sleep(millisToWait);
        }

    }
}

 

2.2.2run方法注入spring

設定在ProcessEngineAutoConfiguration類,方法如下:
springProcessEngineConfiguration(){
    AsyncExecutor springAsyncExecutor = asyncExecutorProvider.getIfUnique();
    if (springAsyncExecutor != null) {
        conf.setAsyncExecutor(springAsyncExecutor);
    }
}

 

啟動在SpringProcessEngineConfiguration類,此類實現了spring的Lifecycle類,具體為start方法,層層堆疊如下:
public void start() {
    synchronized (lifeCycleMonitor) {
        if (!isRunning()) {
            enginesBuild.forEach(name -> {
                ProcessEngine processEngine = ProcessEngines.getProcessEngine(name);
                // 這裡
                processEngine.startExecutors();
                autoDeployResources(processEngine);
            });
            running = true;
        }
    }
}


public void startExecutors() {
    if (asyncExecutor != null && asyncExecutor.isAutoActivate()) {
        // 此處開啟
        asyncExecutor.start();
    }
}

public void start() {
    if (isActive) {
        return;
    }

    isActive = true;

    LOGGER.info("Starting up the async job executor [{}] for engine {}", getClass().getName(), getJobServiceConfiguration().getEngineName());

    initializeJobEntityManager();
    // 初始化
    initializeRunnables();
    // 真實開啟
    startAdditionalComponents();
    executeTemporaryJobs();
}

protected void startAdditionalComponents() {
    if (!isMessageQueueMode) {
        initAsyncJobExecutionThreadPool();
        // 開啟非同步任務方法
        startJobAcquisitionThread();
    }
}

protected void startTimerAcquisitionThread() {
    if (configuration.isTimerJobAcquisitionEnabled()) {
        if (timerJobAcquisitionThread == null) {
            timerJobAcquisitionThread = new Thread(timerJobRunnable);
        }
        
        // 開啟
        timerJobAcquisitionThread.start();
    }
}

 

2.2.3 關於加鎖

進入AcquireAsyncJobsDueRunnable類,邏輯如下:
protected long acquireAndExecuteJobs(CommandExecutor commandExecutor, int remainingCapacity) {
    boolean globalAcquireLockEnabled = configuration.isGlobalAcquireLockEnabled();
    try {
        List<? extends JobInfoEntity> acquiredJobs;
        // 獲取並加鎖
        acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(asyncExecutor, remainingCapacity, jobEntityManager));

        // 執行
        List<JobInfoEntity> rejectedJobs = offerJobs(acquiredJobs);

        LOGGER.debug("Jobs acquired: {}, rejected: {}, for engine {}", acquiredJobs.size(), rejectedJobs.size(), getEngineName());
        

    } catch (FlowableOptimisticLockingException optimisticLockingException) {

    } catch (Throwable e) {
        LOGGER.warn("exception for engine {} during async job acquisition: {}", getEngineName(), e.getMessage(), e);
    }

    return asyncExecutor.getDefaultAsyncJobAcquireWaitTimeInMillis();
}

 

看一下AcquireJobsCmd類,程式碼如下:
public List<? extends JobInfoEntity> execute(CommandContext commandContext) {
    int maxResults = Math.min(remainingCapacity, asyncExecutor.getMaxAsyncJobsDuePerAcquisition());
    List<String> enabledCategories = asyncExecutor.getJobServiceConfiguration().getEnabledJobCategories();
    // 查詢資料庫
    List<? extends JobInfoEntity> jobs = jobEntityManager.findJobsToExecute(enabledCategories, new Page(0, maxResults));

    for (JobInfoEntity job : jobs) {
        // 加鎖
        lockJob(job, asyncExecutor.getAsyncJobLockTimeInMillis(), asyncExecutor.getJobServiceConfiguration());
    }

    return jobs;
}

protected void lockJob(JobInfoEntity job, int lockTimeInMillis, JobServiceConfiguration jobServiceConfiguration) {
    GregorianCalendar gregorianCalendar = calculateLockExpirationTime(lockTimeInMillis, jobServiceConfiguration);
    job.setLockOwner(asyncExecutor.getLockOwner());
    job.setLockExpirationTime(gregorianCalendar.getTime());
}

 

到這裡,實際上並沒有資料庫操作,但是注意,flowable的select方法,會把查出來的資料,放入快取中。且剛剛我們結果的是外層被命令模式封裝的責任鏈,所以,可以知道業務程式碼處理完,會執行責任鏈後置程式碼,具體入庫為CommandContextInterceptor類execute中的commandContext.close()程式碼,此方法內會沖刷session(flushSessions方法)。我們直接看dbsqlSession的處理
public void flush() {
    // 此方法把快取中修改過的物件,組裝為update方法
    determineUpdatedObjects(); 
    removeUnnecessaryOperations();

    if (LOGGER.isDebugEnabled()) {
        debugFlush();
    }

    flushInserts();
    // 更新資料
    flushUpdates();
    flushDeletes();
}

 

至此,我們只要再看下flushUpdates即可,程式碼如下:
protected void flushUpdates() {
    for (Entity updatedObject : updatedObjects) {
        // 執行變更
        int updatedRecords = sqlSession.update(updateStatement, updatedObject);
        // 變更失敗獲取鎖失敗
        if (updatedRecords == 0) {
            throw new FlowableOptimisticLockingException(updatedObject + " was updated by another transaction concurrently");
        }

    }
    updatedObjects.clear();
}

三、關於全域性鎖

3.1 非同步job的lockOwner設定

關於job的lockOwner,如果是機器A從timeJob撈起來,滿足條件的資料,在插入job表是會直接在本機設定,然後註冊一個事務監聽器。job入庫事務提交後,還是機器A來執行job,這個時候,job的死迴圈,不回拉到這個剛剛timeJob撈起過的資料。 現在看起來,以下兩種場景,lockOwner會為空: 1、獨佔任務並行執行時,沒有搶到流程範例鎖的任務,重新插入時,lockOwner為空 2、非同步執行緒池滿了,新任務插入時,報錯被捕獲,此時會情況lockOwner

3.2 全域性鎖對效能的影響分析

基於非同步job的lockOwner設定,當任務執行時間比較密集。比如同一秒有10000個需要執行的任務時,此時假定我們有5臺伺服器,單伺服器的非同步執行緒池佇列長度為200。那麼無論對job還是timeJob,都有大量待認領的任務。 我們可以簡單的把一次拉起帶執行job並操作的過程,分為三步:
  1. 獲取帶執行任務(批次,預設500)
  2. 加鎖,指定此任務由本機執行。
  3. 交給本機非同步執行緒池執行
不開全域性鎖的時候,多個伺服器執行步驟1時,可能會拉取到同樣的資料。此時他們會嘗試以update xx where id=xx;這種格式,對500條sql進行提交。此時有可能多臺機器都在和資料庫互動。因為每次update語句時,事物還沒有提交,所以當前邏輯暫時都沒有問題。但是最終事物提交時,永遠只有一個能成功提交。此時資料庫開始回滾。
當我們機器足夠多,且帶執行任務足夠多時,上述情況大概率發生。而實際上步驟3因為只要交給非同步執行緒池即可結束,並非耗時操作。綜上理解為,全域性鎖應該可以解決1、2中的鎖衝突,從而提升效能。
再具體一些,如果我們的待執行任務不多,那麼可以理解為同一時間,只要有一個機器的job處理器在拉取任務,放入自身的非同步佇列,就可以處理完所有的job,那麼此時全域性鎖開啟其實不會拖慢效能。 相對應的,如果帶執行任務足夠多,多機器並行時,比如AB兩臺機器,不開全域性鎖,可能節約的時間為A機器執行到步驟3時,B機器執行步驟1拉取。但是可能面對的問題是A機器執行步驟3之前,比如1或2時,B機器已經開始拉取資料,此時B機器執行步驟鎖定job時會出現鎖衝突。 即我們可以通過1/2/3步驟,每一步的耗時比,來評判全域性鎖的效能收益。但是考慮到1/2步驟時資料庫操作,且當資料量為500條時步驟2為update xx where id=xx;*500次的資料庫操作,而步驟3為記憶體操作。所以暫時任務步驟1&2的耗時>>步驟3的耗時,所以全域性鎖可以產生全域性正面收益。