C11 標準為執行緒間通訊提供了
條件變數(condition variable)。執行緒可以使用條件變數,以等待來自另一個執行緒的通知,通知告知了指定的條件已被滿足。例如,這類通知可能代表某些資料已經準備好進行處理。
條件變數由型別為 cnd_t 的物件表示,並配合互斥一起使用。一般過程如下:執行緒獲得互斥,然後測試條件。如果條件不滿足,則執行緒繼續等待條件變數(釋放互斥),直到另一個執行緒再次喚醒它,然後該執行緒再次獲得互斥,並再次測試條件,重複上述過程,直到條件滿足。
標頭檔案 threads.h 定義了使用條件變數的函數,它們如下所示:
int cnd_init(cnd_t*cond);
初始化 cond 參照的條件變數。
void cnd_destroy(cnd_t*cond);
釋放指定條件變數使用的所有資源。
int cnd_signal(cnd_t*cond);
在等待指定條件變數的任意數量的執行緒中,喚醒其中一個執行緒。
int cnd_broadcast(cnd_t*cond);
喚醒所有等待指定條件變數的執行緒。
int cnd_wait(cnd_t*cond,mtx_t*mtx);
阻塞正在呼叫的執行緒,並釋放指定的互斥。在呼叫 cnd_wait()之前,執行緒必須持有互斥。如果另一執行緒通過傳送一個信號解除當前執行緒的阻塞(也就是說,通過指定同樣的條件變數作為引數呼叫 cond_signal()或 cnd_broadcast()),那麼呼叫 cnd_wait()的執行緒在 cnd_wait()返回之前會再次獲得互斥。
int cnd_timedwait(cnd_t*restrict cond,mtx_t*restrict mtx,const struct timespec*restrict ts);
與 cnd_wait()類似,cnd_timedwait()阻塞呼叫它們的執行緒,但僅維持由引數 ts 指定的時間。可以通過呼叫函數 timespec_get()獲得一個 struct timespec 物件,它表示當前時間。
除 cnd_destroy()以外的所有條件變數函數,如果它們引發錯誤,則返回值 thrd_error,否則返回值 thrd_success。當時間達到限定值時,函數 cnd_timedwait()也會返回值 thrd_timedout。
例 1 與例 2 中的程式展示了在常見的“生產者-消費者”模型中使用條件變數。程式為每個生產者和消費者開啟一個新執行緒。生產者將一個新產品(在我們的範例中,新產品為一個 int 變數)放入一個環形緩衝區中,假設這個緩衝區沒有滿,然後通知等待的消費者:產品已經準備好。每個消費者從該緩衝區中取出產品,然後將實際情況通知給正在等待的生產者。
在任一特定時間,只有一個執行緒可以修改環形緩衝器。因此,在函數 bufPut()和 bufGet()間將存線上程同步問題,
函數 bufPut()將一個元素插入到緩衝區,函數 buf-Get()將一個元素從緩衝區移除。
有兩個條件變數:生產者等待其中一個條件變數,以判斷緩衝器是否滿了;消費者等待另一個條件變數,以判斷緩衝器是否空了。緩衝區的所有必需元素都包括在結構 Buffer 中。函數 bufInit()初始化具有指定大小的 Buffer 物件,而函數 bufDestroy()銷毀 Buffer 物件。
【例1】用於“生產者-消費者”模型的環形緩衝區
/* buffer.h
* 用於執行緒安全緩衝區的所有宣告
*/
#include <stdbool.h>
#include <threads.h>
typedef struct Buffer
{
int *data; // 指向資料陣列的指標
size_t size, count; // 元素數量的最大值和當前值
size_t tip, tail; // tip = 下一個空點的索引
mtx_t mtx; // 一個互斥
cnd_t cndPut, cndGet; // 兩個條件變數
} Buffer;
bool bufInit( Buffer *bufPtr, size_t size );
void bufDestroy(Buffer *bufPtr);
bool bufPut(Buffer *bufPtr, int data);
bool bufGet(Buffer *bufPtr, int *dataPtr, int sec);
/* -------------------------------------------------------------
* buffer.c
* 定義用於處理Buffer的函數
*/
#include "buffer.h"
#include <stdlib.h> // 為了使用malloc()和free()
bool bufInit( Buffer *bufPtr, size_t size)
{
if ((bufPtr->data = malloc( size * sizeof(int))) == NULL)
return false;
bufPtr->size = size;
bufPtr->count = 0;
bufPtr->tip = bufPtr->tail = 0;
return mtx_init( &bufPtr->mtx, mtx_plain) == thrd_success
&& cnd_init( &bufPtr->cndPut) == thrd_success
&& cnd_init( &bufPtr->cndGet) == thrd_success;
}
void bufDestroy(Buffer *bufPtr)
{
cnd_destroy( &bufPtr->cndGet );
cnd_destroy( &bufPtr->cndPut );
mtx_destroy( &bufPtr->mtx );
free( bufPtr->data );
}
// 在緩衝區中插入一個新元素
bool bufPut(Buffer *bufPtr, int data)
{
mtx_lock( &bufPtr->mtx );
while (bufPtr->count == bufPtr->size)
if (cnd_wait( &bufPtr->cndPut, &bufPtr->mtx ) != thrd_success)
return false;
bufPtr->data[bufPtr->tip] = data;
bufPtr->tip = (bufPtr->tip + 1) % bufPtr->size;
++bufPtr->count;
mtx_unlock( &bufPtr->mtx );
cnd_signal( &bufPtr->cndGet );
return true;
}
// 從緩衝區中移除一個元素
// 如果緩衝區是空的,則等待不超過sec秒
bool bufGet(Buffer *bufPtr, int *dataPtr, int sec)
{
struct timespec ts;
timespec_get( &ts, TIME_UTC ); // 當前時間
ts.tv_sec += sec; // + sec秒延時
mtx_lock( &bufPtr->mtx );
while ( bufPtr->count == 0 )
if (cnd_timedwait(&bufPtr->cndGet,
&bufPtr->mtx, &ts) != thrd_success)
return false;
*dataPtr = bufPtr->data[bufPtr->tail];
bufPtr->tail = (bufPtr->tail + 1) % bufPtr->size;
--bufPtr->count;
mtx_unlock( &bufPtr->mtx );
cnd_signal( &bufPtr->cndPut );
return true;
}
例 2 中的 main()函數建立了一個緩衝區,並啟動了若干個生產者和消費者執行緒,給予每個執行緒一個識別號碼和一個指向緩衝區的指標。每個生產者執行緒建立一定數量的“產品”,然後用一個 return 語句退出。
一個消費者執行緒如果在給定延時期間無法獲得產品以進行消費,則直接返回。
【例2】啟動生產者和消費者執行緒
// producer_consumer.c
#include "buffer.h"
#include <stdio.h>
#include <stdlib.h>
#define NP 2 // 生產者的數量
#define NC 3 // 消費者的數量
int producer(void *); // 執行緒函數
int consumer(void *);
struct Arg { int id; Buffer *bufPtr; }; // 執行緒函數的引數
_Noreturn void errorExit(const char* msg)
{
fprintf(stderr, "%sn", msg); exit(0xff);
}
int main(void)
{
printf("Producer-Consumer Demonn");
Buffer buf; // 為5個產品建立一個緩衝區
bufInit( &buf, 5 );
thrd_t prod[NP], cons[NC]; // 執行緒
struct Arg prodArg[NP], consArg[NC]; // 執行緒的引數
int i = 0, res = 0;
for ( i = 0; i < NP; ++i ) // 啟動生產者
{
prodArg[i].id = i+1, prodArg[i].bufPtr = &buf;
if (thrd_create( &prod[i], producer, &prodArg[i] ) != thrd_success)
errorExit("Thread error.");
}
for ( i = 0; i < NC; ++i ) // 啟動消費者
{
consArg[i].id = i+1, consArg[i].bufPtr = &buf;
if ( thrd_create( &cons[i], consumer, &consArg[i] ) != thrd_success)
errorExit("Thread error.");
}
for ( i = 0; i < NP; ++i ) // 等待執行緒結束
thrd_join(prod[i], &res),
printf("nProducer %d ended with result %d.n", prodArg[i].id, res);
for ( i = 0; i < NC; ++i )
thrd_join(cons[i], &res),
printf("Consumer %d ended with result %d.n", consArg[i].id, res);
bufDestroy( &buf );
return 0;
}
int producer(void *arg) // 生產者執行緒函數
{
struct Arg *argPtr = (struct Arg *)arg;
int id = argPtr->id;
Buffer *bufPtr = argPtr->bufPtr;
int count = 0;
for (int i = 0; i < 10; ++i)
{
int data = 10*id + i;
if (bufPut( bufPtr, data ))
printf("Producer %d produced %dn", id, data), ++count;
else
{ fprintf( stderr,
"Producer %d: error storing %dn", id, data);
return -id;
}
}
return count;
}
int consumer(void *arg) // 消費者執行緒函數
{
struct Arg *argPtr = (struct Arg *)arg;
int id = argPtr->id;
Buffer *bufPtr = argPtr->bufPtr;
int count = 0;
int data = 0;
while (bufGet( bufPtr, &data, 2 ))
{
++count;
printf("Consumer %d consumed %dn", id, data);
}
return count;
}