Timer和ScheduledThreadPoolExecutor的區別及原始碼分析

2022-07-02 06:00:39

Timer

基於單執行緒系統時間實現的延時、定期任務執行類。具體可以看下面紅色標註的程式碼。

public class Timer {
    /**
     * The timer task queue.  This data structure is shared with the timer
     * thread.  The timer produces tasks, via its various schedule calls,
     * and the timer thread consumes, executing timer tasks as appropriate,
     * and removing them from the queue when they're obsolete.
     */
    private final TaskQueue queue = new TaskQueue();

    /**
     * The timer thread.*/
    private final TimerThread thread = new TimerThread(queue);
class TimerThread extends Thread {
    /**
     * This flag is set to false by the reaper to inform us that there
     * are no more live references to our Timer object.  Once this flag
     * is true and there are no more tasks in our queue, there is no
     * work left for us to do, so we terminate gracefully.  Note that
     * this field is protected by queue's monitor!
     */
    boolean newTasksMayBeScheduled = true;

    /**
     * Our Timer's queue.  We store this reference in preference to
     * a reference to the Timer so the reference graph remains acyclic.
     * Otherwise, the Timer would never be garbage-collected and this
     * thread would never go away.
     */
    private TaskQueue queue;

    TimerThread(TaskQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }

    /**
     * The main timer loop.  (See class comment.)
     */
    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
}

 Timer延時、定時任務的實現採用單執行緒,在主迴圈(mainLoop)中迴圈遍歷任務佇列(TaskQueue),如果執行時間小於等於當前系統時間則執行任務,否則繼續等待(執行時間-當前時間)。

ScheduledThreadPoolExecutor

基於多執行緒、JVM時間實現的延時、定期任務執行類。具體可以看下面紅色標註的程式碼。

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

DelayedWorkQueue中的take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }
    /**
     * Returns current nanosecond time.
     */
    final long now() {
        return System.nanoTime();
    }

ThreadPoolExecutor執行流程

submit(task)->execute(task)
->1.當前執行緒數<核心執行緒數: addWorker(核心工作者執行緒)->runWorker-> 迴圈【getTask(workQueue.take)->task.run】
->2.當前執行緒數>=核心執行緒數:排隊任務成功:task add to workQueue(BlockingQueue)->addWorker(非核心工作者執行緒)......
->3.當前執行緒數>=核心執行緒數:排隊任務失敗:嘗試新增新執行緒執行任務 addWorker(非核心工作者執行緒)......

ScheduledThreadPoolExecutor執行延時、定期任務,核心程式碼就在runWorker,迴圈獲取任務佇列中的任務然後執行,在獲取任務的時候如果任務的執行時間沒到,則進行等待。延時時間的計算都是基於System.nanoTime(),即JVM時間。

 

優缺點:

1.Timer單執行緒,執行週期任務時,一次出錯,則TimerThread執行緒終止, 所有任務將無法執行。而且任務的執行時間可能會影響週期的準確性。

2.Timer基於系統時間,系統時間的修改會影響任務的執行。在以系統時間為準的場景中(public void schedule(TimerTask task, Date time))使用非常合適,使用週期性任務則受到極大影響,因為時間間隔被破壞!

3.ScheduledThreadPoolExecutor多執行緒,任務的執行不會相互影響,且能保證執行時間間隔的準確性。

4.ScheduledThreadPoolExecutor基於JVM時間,該時間本身無任何意義,僅用來計算時間間隔,不受系統時間影響。所以用來計算週期間隔特別合適,而且單位是納秒更加精確。因此延時任務、週期任務採用它比Timer更加靠譜!

 

總結:

Timer的使用場景,僅在基於系統時間為準的場景中非常合適(依賴當前系統時間進行判斷任務的執行)。

ScheduledThreadPoolExecutor的使用場景則更為廣泛,對延時任務、週期任務使用此類更靠譜(依賴時間間隔(JVM時間差值計算得到)進行判斷任務的執行)。基於系統時間執行的任務則無法精確(因為系統時間可以隨時調整)!