在標頭檔案 threads.h 中,定義和宣告了支援多執行緒的宏、型別和函數。所有直接與執行緒相關的識別符號,均以字首 thrd_ 作為開頭。例如,thrd_t 是一個物件型別,它標識了一個執行緒。
函數 thrd_create()用於建立並開始執行一個新執行緒。函數 thrd_create()的其中一個引數為在新執行緒中需要被執行的函數 thrd_create()的其中一個引數為在新執行緒中需要被執行的函數。thrd_create()的完整原型是:
int thrd_create(thrd_t *thr, thrd_start_t func, void *arg);
引數 func 是一個指標,它指向在新執行緒需要被執行的函數,而 void 指標 arg 用於向該函數傳遞引數。換句話說,新執行緒將執行函數呼叫 func(arg)。
引數 func 的型別為 thrd_start_t,它被定義為 int(*)(void*)(這是一個函數指標,指向一個 void 指標作為其引數並返回一個 int 值的函數),因此,該執行緒執行的函數返回一個 int 型別的值。
程式在後續過程中可以通過呼叫函數 thread_join()獲得這個 int 型別的返回值(必要時,需等待該執行緒執行完)。
如果一個執行緒啟動成功,函數 thread_create()將新執行緒寫入一個物件進行標識,並通過引數 thr 指向該物件,然後返回宏值 thread_success。
在大多數情況下,後續的其他操作均依賴於該執行緒的執行結果,並且只有當該執行緒完成後,才能執行其他操作。函數 thread_join()用於確保一個執行緒已完成。它的原型是:
int thrd_join(thrd_t thr, int *result);
呼叫 thread_join()的執行緒會被阻塞,直到通過 thr 標識的執行緒執行完成,這裡“阻塞”(block)指的是:執行緒會在呼叫 thread_join()的位置停留必要的時間。然後,thread_join()將執行緒 thr 中執行函數的返回值寫入指標 result 所參照的 int 變數中,假設 result 不是一個空指標。最後,thread_join()釋放屬於執行緒 thr 的所有資源。
如果程式邏輯上並不需要等待執行緒 thr 結束,則應該呼叫以下函數:
int thrd_detach(thrd_t thr);
thrd_detach()使得當執行緒 thr 執行完成後,自動釋放執行緒佔用的所有資源。一旦一個執行緒執行了分離操作(呼叫 thrd_detach()),將不用程式等待其結束,程式也不會獲得該執行緒執行函數的返回值。
對於每個建立的執行緒,呼叫 thread_join()或 thread_detach()不得超過一次。
在例 1 中的程式展示了使用並行操作處理陣列的一種方式。各個執行緒先自行處理陣列的各部分,然後將它們的處理結果組合在一起。該程式僅需計算一個數位序列的總和。
函數 sum()首先根據建立執行緒的最大數量確定劃分陣列所得的各組元素的最大數量,然後呼叫遞迴輔助函數 parallel_sum()。
函數 parallel_sum()將陣列平均分為兩半,將其中的一半交給一個新執行緒處理,同時呼叫自身來處理另一半陣列。如該例所示,一個執行緒函數需要多個引數,這些引數通常採用結構進行封裝。
【例1】在幾個並行執行緒中計算陣列元素的和
#include <stdbool.h>
#include <threads.h>
#define MAX_THREADS 8 // 1、2、4、8……所建立執行緒數量的最大值
#define MIN_BLOCK_SIZE 100 // 一個陣列塊的最小值
typedef struct // 函數parallel_sum()的引數
{
float *start; // 傳遞給parallel_sum()的陣列塊的起始地址
int len; // 陣列塊長度
int block_size; // 最小陣列塊的大小
double sum; // 求和結果
} Sum_arg;
int parallel_sum(void *arg); // 執行緒函數的原型
// ---------------------------------------------------------------
// 計算陣列元素的和,並寫入*sumPtr
// sum()呼叫函數parallel_sum()進行併行處理
// 返回值:如果沒有發生錯誤,則返回true;否則,返回false
bool sum(float arr[], int len, double* sumPtr)
{
int block_size = len / MAX_THREADS;
if (block_size < MIN_BLOCK_SIZE) block_size = len;
Sum_arg args = { arr, len, block_size, 0.0 };
if (parallel_sum(&args))
{ *sumPtr = args.sum; return true; }
else
return false;
}
// ---------------------------------------------------------------
// 遞迴輔助函數,用以將工作分解到幾個執行緒中處理
int parallel_sum(void *arg)
{
Sum_arg *argp = (Sum_arg*)arg; // 指向引數的指標
if (argp->len <= argp->block_size) // 如果length <= block_size,
// 對所有元素求和
{
for (int i = 0; i < argp->len; ++i)
argp->sum += argp->start[i];
return 1;
}
else // 如果length > block_size,
// 分解陣列
{
int mid = argp->len / 2;
Sum_arg arg2 = { argp->start+mid, argp->len-mid,
argp->block_size, 0}; // 指定後一半陣列
argp->len = mid; // 前一半陣列的長度
thrd_t th; // 在新執行緒中處理前一半陣列
int res = 0;
if (thrd_create(&th, parallel_sum, arg) != thrd_success)
return 0; // 沒能成功建立新執行緒
if (!parallel_sum(&arg2)) // 在當前執行緒下,以遞迴方式處理後一半陣列
{
thrd_detach(th); return 0; // 遞回呼叫失敗
}
thrd_join(th, &res);
if (!res)
return 0; // 同級執行緒報告執行失敗
argp->sum += arg2.sum;
return 1;
}
}