60行從零開始自己動手寫FutureTask是什麼體驗?

2022-08-05 12:01:06

前言

在並行程式設計當中我們最常見的需求就是啟動一個執行緒執行一個函數去完成我們的需求,而在這種需求當中,我們常常需要函數有返回值。比如我們需要同一個非常大的陣列當中資料的和,讓每一個執行緒求某一個區間內部的和,最終將這些和加起來,那麼每個執行緒都需要返回對應區間的和。而在Java當中給我們提供了這種機制,去實現這一個效果——FutureTask

FutureTask

在自己寫FutureTask之前我們首先寫一個例子來回顧一下FutureTask的程式設計步驟:

  • 寫一個類實現Callable介面。
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

實現介面就實現call即可,可以看到這個函數是有返回值的,而FutureTask返回給我們的值就是這個函數的返回值。

  • new一個FutureTask物件,並且new一個第一步寫的類,new FutureTask<>(callable實現類)
  • 最後將剛剛得到的FutureTask物件傳入Thread類當中,然後啟動執行緒即可new Thread(futureTask).start();
  • 然後我們可以呼叫FutureTaskget方法得到返回的結果futureTask.get();

假如有一個陣列data,長度為100000,現在有10個執行緒,第i個執行緒求陣列[i * 10000, (i + 1) * 10000)所有資料的和,然後將這十個執行緒的結果加起來。

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    int[] data = new int[100000];
    Random random = new Random();
    for (int i = 0; i < 100000; i++) {
      data[i] = random.nextInt(10000);
    }
    @SuppressWarnings("unchecked")
    FutureTask<Integer>[] tasks = (FutureTask<Integer>[]) Array.newInstance(FutureTask.class, 10);
    // 設定10個 futuretask 任務計算陣列當中資料的和
    for (int i = 0; i < 10; i++) {
      int idx = i;
      tasks[i] = new FutureTask<>(() -> {
        int sum = 0;
        for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {
          sum += data[k];
        }
        return sum;
      });
    }
    // 開啟執行緒執行 futureTask 任務
    for (FutureTask<Integer> futureTask : tasks) {
      new Thread(futureTask).start();
    }
    int threadSum = 0;
    for (FutureTask<Integer> futureTask : tasks) {
      threadSum += futureTask.get();
    }
    int sum = Arrays.stream(data).sum();
    System.out.println(sum == threadSum); // 結果始終為 true
  }
}

可能你會對FutureTask的使用方式感覺困惑,或者不是很清楚,現在我們來仔細捋一下思路。

  1. 首先啟動一個執行緒要麼是繼承自Thread類,然後重寫Thread類的run方法,要麼是給Thread類傳遞一個實現了Runnable的類物件,當然可以用匿名內部類實現。
  2. 既然我們的FutureTask物件可以傳遞給Thread類,說明FutureTask肯定是實現了Runnable介面,我們現在來看一下FutureTask的繼承體系。

​ 可以發現的是FutureTask確實實現了Runnable介面,同時還實現了Future介面,這個Future介面主要提供了後面我們使用FutureTask的一系列函數比如get

  1. 看到這裡你應該能夠大致想到在FutureTask中的run方法會呼叫Callable當中實現的call方法,然後將結果儲存下來,當呼叫get方法的時候再將這個結果返回。

自己實現FutureTask

工具準備

經過上文的分析你可能已經大致瞭解了FutureTask的大致執行過程了,但是需要注意的是,如果你執行FutureTaskget方法是可能阻塞的,因為可能Callablecall方法還沒有執行完成。因此在get方法當中就需要有阻塞執行緒的程式碼,但是當call方法執行完成之後需要將這些執行緒都喚醒。

在本篇文章當中使用鎖ReentrantLock和條件變數Condition進行執行緒的阻塞和喚醒,在我們自己動手實現FutureTask之前,我們先熟悉一下上面兩種工具的使用方法。

  • ReentrantLock主要有兩個方法:
    • lock對臨界區程式碼塊進行加鎖。
    • unlock對臨界區程式碼進行解鎖。
  • Condition主要有三個方法:
    • await阻塞呼叫這個方法的執行緒,等待其他執行緒喚醒。
    • signal喚醒一個被await方法阻塞的執行緒。
    • signalAll喚醒所有被await方法阻塞的執行緒。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {

  private ReentrantLock lock;
  private Condition condition;

  LockDemo() {
    lock = new ReentrantLock();
    condition = lock.newCondition();
  }

  public void blocking() {
    lock.lock();
    try {
      System.out.println(Thread.currentThread() + " 準備等待被其他執行緒喚醒");
      condition.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }finally {
      lock.unlock();
    }
  }

  public void inform() throws InterruptedException {
    // 先休眠兩秒 等他其他執行緒先阻塞
    TimeUnit.SECONDS.sleep(2);
    lock.lock();
    try {
      System.out.println(Thread.currentThread() + " 準備喚醒其他執行緒");
      condition.signal(); // 喚醒一個被 await 方法阻塞的執行緒
      // condition.signalAll(); // 喚醒所有被 await 方法阻塞的執行緒
    }finally {
      lock.unlock();
    }
  }

  public static void main(String[] args) {
    LockDemo lockDemo = new LockDemo();
    Thread thread = new Thread(() -> {
      lockDemo.blocking(); // 執行阻塞執行緒的程式碼
    }, "Blocking-Thread");
    Thread thread1 = new Thread(() -> {
      try {
        lockDemo.inform(); // 執行喚醒執行緒的程式碼
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }, "Inform-Thread");
    thread.start();
    thread1.start();
  }
}

上面的程式碼的輸出:

Thread[Blocking-Thread,5,main] 準備等待被其他執行緒喚醒
Thread[Inform-Thread,5,main] 準備喚醒其他執行緒

FutureTask設計與實現

在前文當中我們已經談到了FutureTask的實現原理,主要有以下幾點:

  • 建構函式需要傳入一個實現了Callable介面的類物件,這個將會在FutureTaskrun方法執行,然後得到函數的返回值,並且將返回值儲存起來。
  • 當執行緒呼叫get方法的時候,如果這個時候Callable當中的call已經執行完成,直接返回call函數返回的結果就行,如果call函數還沒有執行完成,那麼就需要將呼叫get方法的執行緒掛起,這裡我們可以使用condition.await()將執行緒掛起。
  • call函數執行完成之後,需要將之前被get方法掛起的執行緒喚醒繼續執行,這裡使用condition.signalAll()將所有掛起的執行緒喚醒。
  • 因為是我們自己實現FutureTask,功能不會那麼齊全,只需要能夠滿足我們的主要需求即可,主要是幫助大家瞭解FutureTask原理。

實現程式碼如下(分析都在註釋當中):

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

// 這裡需要實現 Runnable 介面,因為需要將這個物件放入 Thread 類當中
// 而 Thread 要求傳入的物件實現了 Runnable 介面
public class MyFutureTask<V> implements Runnable {

  private final Callable<V> callable;
  private Object returnVal; // 這個表示我們最終的返回值
  private final ReentrantLock lock;
  private final Condition condition;

  public MyFutureTask(Callable<V> callable) {
    // 將傳入的 callable 物件儲存起來 方便在後面的 run 方法當中呼叫
    this.callable = callable;
    lock = new ReentrantLock();
    condition = lock.newCondition();
  }

  @SuppressWarnings("unchecked")
  public V get(long timeout, TimeUnit unit) {
    if (returnVal != null) // 如果符合條件 說明 call 函數已經執行完成 返回值已經不為 null 了
      return (V) returnVal; // 直接將結果返回即可 這樣不用競爭鎖資源 提高程式執行效率
    lock.lock();
    try {
      // 這裡需要進行二次判斷 (雙重檢查)
      // 因為如果一個執行緒在第一次判斷 returnVal 為空
      // 然後這個時候它可能因為獲取鎖而被掛起
      // 而在被掛起的這段時間,call 可能已經執行完成
      // 如果這個時候不進行判斷直接執行 await方法
      // 那後面這個執行緒將無法被喚醒
      if (returnVal == null)
        condition.await(timeout, unit);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
    return (V) returnVal;
  }

  @SuppressWarnings("unchecked")
  public V get() {
    if (returnVal != null)
      return (V) returnVal;
    lock.lock();
    try {
      // 同樣的需要進行雙重檢查
      if (returnVal == null)
      	condition.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
    return (V) returnVal;
  }


  @Override
  public void run() {
    if (returnVal != null)
      return;
    try {
      // 在 Runnable 的 run 方法當中
      // 執行 Callable 方法的 call 得到返回結果
      returnVal = callable.call();
    } catch (Exception e) {
      e.printStackTrace();
    }
    lock.lock();
    try {
      // 因為已經得到了結果
      // 因此需要將所有被 await 方法阻塞的執行緒喚醒
      // 讓他們從 get 方法返回
      condition.signalAll();
    }finally {
      lock.unlock();
    }
  }
	// 下面是測試程式碼
  public static void main(String[] args) {
    MyFutureTask<Integer> ft = new MyFutureTask<>(() -> {
      TimeUnit.SECONDS.sleep(2);
      return 101;
    });
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get(100, TimeUnit.MILLISECONDS)); // 輸出為 null
    System.out.println(ft.get()); // 輸出為 101
  }
}

我們現在用我們自己寫的MyFutureTask去實現在前文當中陣列求和的例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
  int[] data = new int[100000];
  Random random = new Random();
  for (int i = 0; i < 100000; i++) {
    data[i] = random.nextInt(10000);
  }
  @SuppressWarnings("unchecked")
  MyFutureTask<Integer>[] tasks = (MyFutureTask<Integer>[]) Array.newInstance(MyFutureTask.class, 10);
  for (int i = 0; i < 10; i++) {
    int idx = i;
    tasks[i] = new MyFutureTask<>(() -> {
      int sum = 0;
      for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {
        sum += data[k];
      }
      return sum;
    });
  }
  for (MyFutureTask<Integer> MyFutureTask : tasks) {
    new Thread(MyFutureTask).start();
  }
  int threadSum = 0;
  for (MyFutureTask<Integer> MyFutureTask : tasks) {
    threadSum += MyFutureTask.get();
  }
  int sum = Arrays.stream(data).sum();
  System.out.println(sum == threadSum); // 輸出結果為 true
}

總結

在本篇文章當中主要給大家介紹了FutureTask的內部原理,並且我們自己通過使用ReentrantLockCondition實現了我們自己的FutureTask,本篇文章的主要內容如下:

  • FutureTask的內部原理:
    • FutureTask首先會繼承Runnable介面,這樣就可以將FutureTask的物件直接放入Thread類當中,作為建構函式的引數。
    • 我們在使用FutureTask的時候需要傳入一個Callable實現類的物件,在函數call當中實現我們需要執行的函數,執行完成之後,將call函數的返回值儲存下來,當有執行緒呼叫get方法時候將儲存的返回值返回。
  • 我們使用條件變數進行對執行緒的阻塞和喚醒。
    • 當有執行緒呼叫get方法時,如果call已經執行完成,那麼可以直接將結果返回,否則需要使用條件變數將執行緒掛起。
    • call函數執行完成的時候,需要使用條件變數將所有阻塞在get方法的執行緒喚醒。
  • 雙重檢查:
    • 我們在get方法當中首先判斷returnVal是否為空,如果不為空直接將結果返回,這就可以不用去競爭鎖資源了,可以提高程式執行的效率。
    • 但是我們在使用鎖保護的臨界區還需要進行判斷,判斷returnVal是否為空,因為如果一個執行緒在第一次判斷 returnVal 為空,然後這個時候它可能因為獲取鎖而被掛起, 而在被掛起的這段時間,call 可能已經執行完成,如果這個時候不進行判斷直接執行 await方法,那後面這個執行緒將無法被喚醒,因為在call函數執行完成之後呼叫了condition.signalAll(),如果執行緒在這之後執行await方法,那麼將來再沒有執行緒去將這些執行緒喚醒。

更多精彩內容合集可存取專案:https://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,瞭解更多計算機(Java、Python、計算機系統基礎、演演算法與資料結構)知識。