自己動手寫執行緒池——向JDK執行緒池進發

2022-10-22 06:00:49

自己動手寫執行緒池——向JDK執行緒池進發

前言

在前面的文章自己動手寫乞丐版執行緒池中,我們寫了一個非常簡單的執行緒池實現,這個只是一個非常簡單的實現,在本篇文章當中我們將要實現一個和JDK內部實現的執行緒池非常相似的執行緒池。

JDK執行緒池一瞥

我們首先看一個JDK給我們提供的執行緒池ThreadPoolExecutor的建構函式的引數:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

引數解釋:

  • corePoolSize:這個引數你可以理解為執行緒池當中至少需要 corePoolSize 個執行緒,初始時執行緒池當中執行緒的個數為0,當執行緒池當中執行緒的個數小於 corePoolSize 每次提交一個任務都會建立一個執行緒,並且先執行這個提交的任務,然後再去任務佇列裡面去獲取新的任務,然後再執行。
  • maximumPoolSize:這個引數指的是執行緒池當中能夠允許的最大的執行緒的數目,當任務佇列滿了之後如果這個時候有新的任務想要加入佇列當中,當發現佇列滿了之後就建立新的執行緒去執行任務,但是需要滿足最大的執行緒的個數不能夠超過 maximumPoolSize 。
  • keepAliveTime 和 unit:這個主要是用於時間的表示,當佇列當中多長時間沒有資料的時候執行緒自己退出,前面談到了執行緒池當中任務過多的時候會超過 corePoolSize ,當執行緒池閒下來的時候這些多餘的執行緒就可以退出了。
  • workQueue:這個就是用於儲存任務的阻塞佇列。
  • threadFactory:這個引數倒不是很重要,執行緒工廠。
  • handler:這個表示拒絕策略,JDK給我們提供了四種策略:
    • AbortPolicy:丟擲異常。
    • DiscardPolicy:放棄這個任務。
    • CallerRunPolicy:提交任務的執行緒執行。
    • DiscardOldestPolicy:放棄等待時間最長的任務。

如果上面的引數你不能夠理解,可以先閱讀這篇文章自己動手寫乞丐版執行緒池。基於上面談到的引數,執行緒池當中提交任務的流程大致如下圖所示:

自己動手實現執行緒池

根據前面的引數分析我們自己實現的執行緒池需要實現一下功能:

  • 能夠提交Runnable的任務和Callable的任務。
  • 執行緒池能夠自己實現動態的擴容和所容,動態調整執行緒池當中執行緒的數目,當任務多的時候能夠增加執行緒的數目,當任務少的時候多出來的執行緒能夠自動退出。
  • 有自己的拒絕策略,當任務佇列滿了,執行緒數也達到最大的時候,需要拒絕提交的任務。

執行緒池引數介紹

  private AtomicInteger ct = new AtomicInteger(0); // 當前在執行任務的執行緒個數
  private int corePoolSize;
  private int maximumPoolSize;
  private long keepAliveTime;
  private TimeUnit unit;
  private BlockingQueue<Runnable> taskQueue;
  private RejectPolicy policy;

  private ArrayList<Worker> workers = new ArrayList<>();

  private volatile boolean isStopped;
  private boolean useTimed;

引數解釋如下:

  • ct:表示當前執行緒池當中執行緒的個數。

  • corePoolSize:執行緒池當中核心執行緒的個數,意義和上面談到的JDK的執行緒池意義一致。

  • maximumPoolSize:執行緒池當中最大的執行緒個數,意義和上面談到的JDK的執行緒池意義一致。

  • keepAliveTime 和 unit:和JDK執行緒池的引數意義一致。

  • taskQueue:任務佇列,用不儲存提交的任務。

  • policy:拒絕策略,主要有一下四種策略:

public enum RejectPolicy {

  ABORT,
  CALLER_RUN,
  DISCARD_OLDEST,
  DISCARD
}
  • workers:用於儲存工作執行緒。
  • isStopped:執行緒池是否被關閉了。
  • useTimed:主要是用於表示是否使用上面的 keepAliveTime 和 unit,如果使用就是在一定的時間內,如果沒有從任務佇列當中獲取到任務,執行緒就從執行緒池退出,但是需要保證執行緒池當中最小的執行緒個數不小於 corePoolSize 。

實現Runnable

  // 下面這個方法是向執行緒池提交任務
  public void execute(Runnable runnable) throws InterruptedException {
    checkPoolState();

    if (addWorker(runnable, false)  // 如果能夠加入新的執行緒執行任務 加入成功就直接返回
            || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 說明提交任務失敗 任務佇列已經滿了
            || addWorker(runnable, true)) // 使用能夠使用的最大的執行緒數 (maximumPoolSize) 看是否能夠產生新的執行緒
      return;

    // 如果任務佇列滿了而且不能夠加入新的執行緒 則拒絕這個任務
    if (!taskQueue.offer(runnable))
      reject(runnable);
  }

在上面的程式碼當中:

  • checkPoolState函數是檢查執行緒池的狀態,當執行緒池被停下來之後就不能夠在提交任務:
  private void checkPoolState() {
    if (isStopped) {
      // 如果執行緒池已經停下來了,就不在向任務佇列當中提交任務了
      throw new RuntimeException("thread pool has been stopped, so quit submitting task");
    }
  }

  • addWorker函數是往執行緒池當中提交任務並且產生一個執行緒,並且這個執行緒執行的第一個任務就是傳遞的引數。max表示執行緒的最大數目,max == true 的時候表示使用 maximumPoolSize 否則使用 corePoolSize,當返回值等於 true 的時候表示執行成功,否則表示執行失敗。
  /**
   *
   * @param runnable 需要被執行的任務
   * @param max 是否使用 maximumPoolSize
   * @return boolean
   */
  public synchronized boolean addWorker(Runnable runnable, boolean max) {

    if (ct.get() >= corePoolSize && !max)
      return false;
    if (ct.get() >= maximumPoolSize && max)
      return false;
    Worker worker = new Worker(runnable);
    workers.add(worker);
    Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
    thread.start();
    return true;
  }

實現Callable

這個函數其實比較簡單,只需要將傳入的Callable物件封裝成一個FutureTask物件即可,因為FutureTask實現了Callable和Runnable兩個介面,然後將這個結果返回即可,得到這個物件,再呼叫物件的 get 方法就能夠得到結果。

  public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
    checkPoolState();
    FutureTask<V> futureTask = new FutureTask<>(task);
    execute(futureTask);
    return futureTask;
  }

拒絕策略的實現

根據前面提到的各種策略的具體實現方式,具體的程式碼實現如下所示:

  private void reject(Runnable runnable) throws InterruptedException {
    switch (policy) {
      case ABORT:
        throw new RuntimeException("task queue is full");
      case CALLER_RUN:
        runnable.run();
      case DISCARD: // 直接放棄這個任務
        return;
      case DISCARD_OLDEST:
        // 放棄等待時間最長的任務 也就是佇列當中的第一個任務
        taskQueue.poll();
        execute(runnable); // 重新執行這個任務
    }
  }

執行緒池關閉實現

一共兩種方式實現執行緒池關閉:

  • 直接關閉執行緒池,不管任務佇列當中的任務是否被全部執行完成。
  • 安全關閉執行緒池,先等待任務佇列當中所有的任務被執行完成,再關閉執行緒池,但是在這個過程當中不允許繼續提交任務了,這一點已經在函數 checkPoolState 當中實現了。
  // 強制關閉執行緒池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先表示關閉執行緒池 執行緒就不能再向執行緒池提交任務
    isStopped = true;
    // 先等待所有的任務執行完成再關閉執行緒池
    waitForAllTasks();
    stop();
  }

  private void waitForAllTasks() {
    // 當執行緒池當中還有任務的時候 就不退出迴圈
    while (taskQueue.size() > 0) {
      Thread.yield();
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

工作執行緒的工作實現

    @Override
    public void run() {
      // 先執行傳遞過來的第一個任務 這裡是一個小的優化 讓執行緒直接執行第一個任務 不需要
      // 放入任務佇列再取出來執行了
      firstTask.run();

      thisThread = Thread.currentThread();
      while (!isStopped) {
        try {
          // 是否使用時間就在這裡顯示出來了
          Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
          if (task == null) {
            int i;
            boolean exit = true;
            // 如果當前執行緒數大於核心執行緒數 則使用 CAS 去退出 用於保證線上程安全下的退出
            // 且保證執行緒的個數不小於 corePoolSize 下面這段程式碼需要仔細分析一下
            if (ct.get() > corePoolSize) {
              do{
                i = ct.get();
                if (i <= corePoolSize) {
                  exit = false;
                  break;
                }
              }while (!ct.compareAndSet(i, i - 1));
              if (exit) {
                return;
              }
            }
          }else {
            task.run();
          }
        } catch (InterruptedException e) {
          // do nothing
        }
      }
    }

我們現在來仔細分析一下,執行緒退出執行緒池的時候是如何保證執行緒池當中總的執行緒數是不小於 corePoolSize 的!首先整體的框架是使用 CAS 進行實現,具體程式碼為 do ... while 操作,然後在 while 操作裡面使用 CAS 進行測試替換,如果沒有成功再次獲取 ,當執行緒池當中核心執行緒的數目小於等於 corePoolSize 的時候也需要退出迴圈,因為執行緒池當中執行緒的個數不能小於 corePoolSize 。因此使用 break 跳出迴圈的執行緒是不會退出執行緒池的。

執行緒池實現的BUG

在我們自己實現的執行緒池當中當執行緒退出的時候,workers 當中還儲存這指向這個執行緒的物件,但是當執行緒退出的時候我們還沒有在 workers 當中刪除這個物件,因此這個執行緒物件不會被垃圾回收器收集掉,但是我們這個只是一個執行緒池實現的例子而已,並不用於生產環境,只是為了幫助大家理解執行緒池的原理。

完整程式碼

package cscore.concurrent.java.threadpoolv2;


import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPool {

  private AtomicInteger ct = new AtomicInteger(0); // 當前在執行任務的執行緒個數
  private int corePoolSize;
  private int maximumPoolSize;
  private long keepAliveTime;
  private TimeUnit unit;
  private BlockingQueue<Runnable> taskQueue;
  private RejectPolicy policy;

  private ArrayList<Worker> workers = new ArrayList<>();

  private volatile boolean isStopped;
  private boolean useTimed;

  public int getCt() {
    return ct.get();
  }

  public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy
          , int maxTasks) {
    // please add -ea to vm options to make assert keyword enable
    assert corePoolSize > 0;
    assert maximumPoolSize > 0;
    assert keepAliveTime >= 0;
    assert maxTasks > 0;

    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.unit = unit;
    this.policy = policy;
    this.keepAliveTime = keepAliveTime;
    taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks);
    useTimed = keepAliveTime != 0;
  }

  /**
   *
   * @param runnable 需要被執行的任務
   * @param max 是否使用 maximumPoolSize
   * @return boolean
   */
  public synchronized boolean addWorker(Runnable runnable, boolean max) {

    if (ct.get() >= corePoolSize && !max)
      return false;
    if (ct.get() >= maximumPoolSize && max)
      return false;
    Worker worker = new Worker(runnable);
    workers.add(worker);
    Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
    thread.start();
    return true;
  }

  // 下面這個方法是向執行緒池提交任務
  public void execute(Runnable runnable) throws InterruptedException {
    checkPoolState();

    if (addWorker(runnable, false)  // 如果能夠加入新的執行緒執行任務 加入成功就直接返回
            || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 說明提交任務失敗 任務佇列已經滿了
            || addWorker(runnable, true)) // 使用能夠使用的最大的執行緒數 (maximumPoolSize) 看是否能夠產生新的執行緒
      return;

    // 如果任務佇列滿了而且不能夠加入新的執行緒 則拒絕這個任務
    if (!taskQueue.offer(runnable))
      reject(runnable);
  }

  private void reject(Runnable runnable) throws InterruptedException {
    switch (policy) {
      case ABORT:
        throw new RuntimeException("task queue is full");
      case CALLER_RUN:
        runnable.run();
      case DISCARD:
        return;
      case DISCARD_OLDEST:
        // 放棄等待時間最長的任務
        taskQueue.poll();
        execute(runnable);
    }
  }

  private void checkPoolState() {
    if (isStopped) {
      // 如果執行緒池已經停下來了,就不在向任務佇列當中提交任務了
      throw new RuntimeException("thread pool has been stopped, so quit submitting task");
    }
  }

  public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
    checkPoolState();
    FutureTask<V> futureTask = new FutureTask<>(task);
    execute(futureTask);
    return futureTask;
  }

  // 強制關閉執行緒池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先表示關閉執行緒池 執行緒就不能再向執行緒池提交任務
    isStopped = true;
    // 先等待所有的任務執行完成再關閉執行緒池
    waitForAllTasks();
    stop();
  }

  private void waitForAllTasks() {
    // 當執行緒池當中還有任務的時候 就不退出迴圈
    while (taskQueue.size() > 0) {
      Thread.yield();
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  class Worker implements Runnable {

    private Thread thisThread;

    private final Runnable firstTask;
    private volatile boolean isStopped;

    public Worker(Runnable firstTask) {
      this.firstTask = firstTask;
    }

    @Override
    public void run() {
      // 先執行傳遞過來的第一個任務 這裡是一個小的優化 讓執行緒直接執行第一個任務 不需要
      // 放入任務佇列再取出來執行了
      firstTask.run();

      thisThread = Thread.currentThread();
      while (!isStopped) {
        try {
          Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
          if (task == null) {
            int i;
            boolean exit = true;
            if (ct.get() > corePoolSize) {
              do{
                i = ct.get();
                if (i <= corePoolSize) {
                  exit = false;
                  break;
                }
              }while (!ct.compareAndSet(i, i - 1));
              if (exit) {
                return;
              }
            }
          }else {
            task.run();
          }
        } catch (InterruptedException e) {
          // do nothing
        }
      }
    }

    public synchronized void stopWorker() {
      if (isStopped) {
        throw new RuntimeException("thread has been interrupted");
      }
      isStopped = true;
      thisThread.interrupt();
    }

  }

}

執行緒池測試

package cscore.concurrent.java.threadpoolv2;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

public class Test {

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000);

    for (int i = 0; i < 10; i++) {
      RunnableFuture<Integer> submit = pool.submit(() -> {
        System.out.println(Thread.currentThread().getName() + " output a");
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return 0;
      });
      System.out.println(submit.get());
    }
    int n = 15;
    while (n-- > 0) {
      System.out.println("Number Threads = " + pool.getCt());
      Thread.sleep(1000);
    }
    pool.shutDown();
  }
}

上面測試程式碼的輸出結果如下所示:

ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2

從上面的程式碼可以看出我們實現了正確的任務實現結果,同時執行緒池當中的核心執行緒數從 2 變到了 5 ,當執行緒池當中任務佇列全部別執行完成之後,執行緒的數目重新降下來了,這確實是我們想要達到的結果。

總結

在本篇文章當中主要給大家介紹瞭如何實現一個類似於JDK中的執行緒池,裡面有非常多的實現細節,大家可以仔細捋一下其中的流程,對執行緒池的理解將會非常有幫助。


以上就是本篇文章的所有內容了,我是LeHung,我們下期再見!!!更多精彩內容合集可存取專案:https://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,瞭解更多計算機(Java、Python、計算機系統基礎、演演算法與資料結構)知識。