public class JobAddedTransactionListener implements TransactionListener { ... @Override public void execute(CommandContext commandContext) { asyncExecutor.executeAsyncJob(job); }
public void run() { ... if (job instanceof AbstractRuntimeJobEntity) { boolean lockingNeeded = ((AbstractRuntimeJobEntity) job).isExclusive(); boolean executeJob = true; if (lockingNeeded) { executeJob = lockJob(); } if (executeJob) { executeJob(lockingNeeded); } } }
protected boolean lockJob() { Job job = (Job) this.job; try { // 加鎖 jobServiceConfiguration.getCommandExecutor().execute(new LockExclusiveJobCmd(job, jobServiceConfiguration)); } catch (Throwable lockException) { // 釋放job,等待下次被呼叫 unacquireJob(); return false; } return true; }
protected void lockJobScopeInternal(Job job) { ExecutionEntityManager executionEntityManager = getExecutionEntityManager(); ExecutionEntity execution = executionEntityManager.findById(job.getExecutionId()); if (execution != null) { String lockOwner; Date lockExpirationTime; // 處理lockOwner與lockExpirationTime,省略 executionEntityManager.updateProcessInstanceLockTime(execution.getProcessInstanceId(), lockOwner, lockExpirationTime); } }
public void updateProcessInstanceLockTime(String processInstanceId, Date lockDate, String lockOwner, Date expirationTime) { HashMap<String, Object> params = new HashMap<>(); params.put("id", processInstanceId); params.put("lockTime", lockDate); params.put("expirationTime", expirationTime); params.put("lockOwner", lockOwner); int result = getDbSqlSession().directUpdate("updateProcessInstanceLockTime", params); if (result == 0) { throw new FlowableOptimisticLockingException("Could not lock process instance"); } }
public synchronized void run() { while (!isInterrupted) { // 全域性鎖——增加之後,所有範例中,鎖過期前同一時間只有一個可以執行 if (configuration.isGlobalAcquireLockEnabled()) { } else { // 迴圈執行 millisToWait = executeAcquireCycle(commandExecutor); } // 等待 if (millisToWait > 0) { sleep(millisToWait); } } }
springProcessEngineConfiguration(){ AsyncExecutor springAsyncExecutor = asyncExecutorProvider.getIfUnique(); if (springAsyncExecutor != null) { conf.setAsyncExecutor(springAsyncExecutor); } }
public void start() { synchronized (lifeCycleMonitor) { if (!isRunning()) { enginesBuild.forEach(name -> { ProcessEngine processEngine = ProcessEngines.getProcessEngine(name); // 這裡 processEngine.startExecutors(); autoDeployResources(processEngine); }); running = true; } } } public void startExecutors() { if (asyncExecutor != null && asyncExecutor.isAutoActivate()) { // 此處開啟 asyncExecutor.start(); } } public void start() { if (isActive) { return; } isActive = true; LOGGER.info("Starting up the async job executor [{}] for engine {}", getClass().getName(), getJobServiceConfiguration().getEngineName()); initializeJobEntityManager(); // 初始化 initializeRunnables(); // 真實開啟 startAdditionalComponents(); executeTemporaryJobs(); } protected void startAdditionalComponents() { if (!isMessageQueueMode) { initAsyncJobExecutionThreadPool(); // 開啟非同步任務方法 startJobAcquisitionThread(); } } protected void startTimerAcquisitionThread() { if (configuration.isTimerJobAcquisitionEnabled()) { if (timerJobAcquisitionThread == null) { timerJobAcquisitionThread = new Thread(timerJobRunnable); } // 開啟 timerJobAcquisitionThread.start(); } }
protected long acquireAndExecuteJobs(CommandExecutor commandExecutor, int remainingCapacity) { boolean globalAcquireLockEnabled = configuration.isGlobalAcquireLockEnabled(); try { List<? extends JobInfoEntity> acquiredJobs; // 獲取並加鎖 acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(asyncExecutor, remainingCapacity, jobEntityManager)); // 執行 List<JobInfoEntity> rejectedJobs = offerJobs(acquiredJobs); LOGGER.debug("Jobs acquired: {}, rejected: {}, for engine {}", acquiredJobs.size(), rejectedJobs.size(), getEngineName()); } catch (FlowableOptimisticLockingException optimisticLockingException) { } catch (Throwable e) { LOGGER.warn("exception for engine {} during async job acquisition: {}", getEngineName(), e.getMessage(), e); } return asyncExecutor.getDefaultAsyncJobAcquireWaitTimeInMillis(); }
public List<? extends JobInfoEntity> execute(CommandContext commandContext) { int maxResults = Math.min(remainingCapacity, asyncExecutor.getMaxAsyncJobsDuePerAcquisition()); List<String> enabledCategories = asyncExecutor.getJobServiceConfiguration().getEnabledJobCategories(); // 查詢資料庫 List<? extends JobInfoEntity> jobs = jobEntityManager.findJobsToExecute(enabledCategories, new Page(0, maxResults)); for (JobInfoEntity job : jobs) { // 加鎖 lockJob(job, asyncExecutor.getAsyncJobLockTimeInMillis(), asyncExecutor.getJobServiceConfiguration()); } return jobs; } protected void lockJob(JobInfoEntity job, int lockTimeInMillis, JobServiceConfiguration jobServiceConfiguration) { GregorianCalendar gregorianCalendar = calculateLockExpirationTime(lockTimeInMillis, jobServiceConfiguration); job.setLockOwner(asyncExecutor.getLockOwner()); job.setLockExpirationTime(gregorianCalendar.getTime()); }
public void flush() { // 此方法把快取中修改過的物件,組裝為update方法 determineUpdatedObjects(); removeUnnecessaryOperations(); if (LOGGER.isDebugEnabled()) { debugFlush(); } flushInserts(); // 更新資料 flushUpdates(); flushDeletes(); }
protected void flushUpdates() { for (Entity updatedObject : updatedObjects) { // 執行變更 int updatedRecords = sqlSession.update(updateStatement, updatedObject); // 變更失敗獲取鎖失敗 if (updatedRecords == 0) { throw new FlowableOptimisticLockingException(updatedObject + " was updated by another transaction concurrently"); } } updatedObjects.clear(); }