我們先來打個比方,執行緒池就好像一個工具箱,我們每次需要擰螺絲的時候都要從工具箱裡面取出一個螺絲刀來。有時候需要取出一個來擰,有時候螺絲多的時候需要多個人取出多個來擰,擰完自己的螺絲那麼就會把螺絲刀再放回去,然後別人下次用的時候再取出來用。
說白了執行緒池就是相當於「提前申請了一些資源,也就是執行緒」,需要的時候就從執行緒池中取出執行緒來處理一些事情,處理完畢之後再把執行緒放回去。
我們來思考一個問題,為什麼需要執行緒池呢?假如沒有執行緒池的話我們每次呼叫執行緒是什麼樣子的?
顯然首先是先建立一個執行緒,然後把任務交給這個執行緒,最後把這個執行緒銷燬掉。這樣實現起來非常簡便,但是就會有一個問題:如果並行的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒是需要消耗時間的。
那麼如果我們改用執行緒池的話,在程式執行的時候就會首先建立一批執行緒,然後交給執行緒池來管理。有需要的時候我們從執行緒池中取出執行緒用於處理任務,用完後我們再將其回收到執行緒池中,這樣是不是就避免了每次都需要建立和銷燬執行緒這種耗時的操作。
有人會說你使用執行緒池一開始就消耗了一些記憶體,之後一直不釋放這些記憶體,這樣豈不是有點浪費。其實這是類似於空間換時間的概念,我們確實多佔用了一點記憶體但是這些記憶體和我們珍惜出來的這些時間相比,是非常划算的。
池的概念是一種非常常見的空間換時間的概念,除了有執行緒池之外還有程序池、記憶體池等等。其實他們的思想都是一樣的就是我先申請一批資源出來,然後就隨用隨拿,不用再放回來。
執行緒池的組成主要分為 3 個部分,這三部分配合工作就可以得到一個完整的執行緒池:
由於本篇是對執行緒池的簡單介紹,所以簡化了一下執行緒池的模型,將 1.3 中的「3. 管理者執行緒」的角色給去除了。
/* 任務結構體 */
typedef struct
{
void (*function)(void *);
void *arg;
} threadpool_task_t;
/* 執行緒池結構體 */
typedef struct
{
pthread_mutex_t lock; // 執行緒池鎖,鎖整個的執行緒池
pthread_cond_t notify; // 條件變數,用於告知執行緒池中的執行緒來任務了
int thread_count; // 執行緒池中的工作執行緒總數
pthread_t *threads; // 執行緒池中的工作執行緒
int started; // 執行緒池中正在工作的執行緒個數
threadpool_task_t *queue; // 任務佇列
int queue_size; // 任務佇列能容納的最大任務數
int head; // 隊頭 -> 取任務
int tail; // 隊尾 -> 放任務
int count; // 任務佇列中剩餘的任務個數
int shutdown; // 執行緒池狀態, 0 表示執行緒池可用,其餘值表示關閉
} threadpool_t;
thread_count 和 started 的區別:
shutdown 的作用:如果需要銷燬執行緒池,那麼必須要現將所有的執行緒退出才可銷燬,而 shutdown 就是用於告知正在工作中的執行緒,執行緒池是否關閉用的。關閉方式又分為兩種:一種是立即關閉,即不管任務佇列中是否還有任務;另一種是優雅的關閉,即先處理完任務佇列中的任務後再關閉。這兩種方式可通過設定 shutdown 的不同取值即可實現:
typedef enum
{
immediate_shutdown = 1, // 立即關閉執行緒池
graceful_shutdown = 2 // 等執行緒池中的任務全部處理完成後,再關閉執行緒池
} threadpool_shutdown_t;
函數原型:int ThreadPool_Init(int thread_count, int queue_size, threadpool_t **ppstThreadPool);
頭 文 件:#include "ThrdPool.h"
函數功能:初始化執行緒池
引數描述:
返 回 值:成功返回 E_SUCCEED,失敗返回 E_ERROR
函數原型:int ThreadPool_Dispatch(threadpool_t *pstThreadPool, void (*function)(void *), void *arg);
頭 文 件:#include "ThrdPool.h"
函數功能:向執行緒池的任務佇列中分發任務
引數描述:
返 回 值:成功返回 E_SUCCEED,失敗返回 E_ERROR
函數原型:void Threadpool_Destroy(threadpool_t *pool, threadpool_shutdown_t shutdown_mode);
頭 文 件:#include "ThrdPool.h"
函數功能:銷燬執行緒池
引數描述:
#ifndef __THRDPOOL_H__
#define __THRDPOOL_H__
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#define DEBUG(format, args...) \
printf("[%s:%d] "format"\n", \
__FILE__, \
__LINE__, \
##args)
#define MAX_THREADS 16 // 執行緒池最大工作執行緒個數
#define MAX_QUEUE 256 // 執行緒池工作佇列上限
#define E_SUCCEED 0
#define E_ERROR 112
#define SAFE_FREE(ptr) \
if (ptr) \
{ \
free(ptr); \
ptr = NULL; \
}
/* 任務結構體 */
typedef struct
{
void (*function)(void *);
void *arg;
} threadpool_task_t;
/* 執行緒池結構體 */
typedef struct
{
pthread_mutex_t lock; // 執行緒池鎖,鎖整個的執行緒池
pthread_cond_t notify; // 條件變數,用於告知執行緒池中的執行緒來任務了
int thread_count; // 執行緒池中的工作執行緒總數
pthread_t *threads; // 執行緒池中的工作執行緒
int started; // 執行緒池中正在工作的執行緒個數
threadpool_task_t *queue; // 任務佇列
int queue_size; // 任務佇列能容納的最大任務數
int head; // 隊頭 -> 取任務
int tail; // 隊尾 -> 放任務
int count; // 任務佇列中剩餘的任務個數
int shutdown; // 執行緒池狀態, 0 表示執行緒池可用,其餘值表示關閉
} threadpool_t;
typedef enum
{
immediate_shutdown = 1, // 立即關閉執行緒池
graceful_shutdown = 2 // 等執行緒池中的任務全部處理完成後,再關閉執行緒池
} threadpool_shutdown_t;
int ThreadPool_Init(int thread_count, int queue_size, threadpool_t **ppstThreadPool);
int ThreadPool_Dispatch(threadpool_t *pstThreadPool, void (*function)(void *), void *arg);
void Threadpool_Destroy(threadpool_t *pool, threadpool_shutdown_t shutdown_mode);
#endif
#include <stdio.h>
#include "ThrdPool.h"
#define THREAD_COUNT 4
#define QUEUE_SIZE 128
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 靜態初始化鎖,用於保證第16行的完整輸出
void func(void *arg)
{
static int num = 0;
pthread_mutex_lock(&mutex);
// 為方便觀察,故特意輸出該語句,並使用num來區分不同的任務
DEBUG("這是執行的第 %d 個任務", ++num);
usleep(100000); // 模擬任務耗時,100ms
pthread_mutex_unlock(&mutex);
return;
}
int main()
{
int iRet;
threadpool_t *pool;
iRet = ThreadPool_Init(THREAD_COUNT, QUEUE_SIZE, &pool);
if (iRet != E_SUCCEED)
{
return 0;
}
int i;
for (i = 0; i < 20; i++) // 生產者,向任務佇列中塞入 20 個任務
{
ThreadPool_Dispatch(pool, func, NULL);
}
usleep(500000);
// Threadpool_Destroy(pool, immediate_shutdown); // 立刻關閉執行緒池
Threadpool_Destroy(pool, graceful_shutdown); // 等任務佇列中的任務全部執行完畢再關閉
return 0;
}
#include <stdio.h>
#include "ThrdPool.h"
#define THREAD_COUNT 4
#define QUEUE_SIZE 128
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 靜態初始化鎖,用於保證第 15 行的完整輸出
void func(void *arg)
{
static int num = 0;
pthread_mutex_lock(&mutex);
DEBUG("這是執行的第 %d 個任務", ++num); // 為方便觀察,故特意輸出該語句,並使用num來區分不同的任務
usleep(100000); // 模擬任務耗時,100ms
pthread_mutex_unlock(&mutex);
return;
}
int main()
{
int iRet;
threadpool_t *pool;
iRet = ThreadPool_Init(THREAD_COUNT, QUEUE_SIZE, &pool);
if (iRet != E_SUCCEED)
{
return 0;
}
int i;
for (i = 0; i < 20; i++) // 生產者,向任務佇列中塞入 20 個任務
{
ThreadPool_Dispatch(pool, func, NULL);
}
usleep(500000);
// Threadpool_Destroy(pool, immediate_shutdown); // 立刻關閉執行緒池
Threadpool_Destroy(pool, graceful_shutdown); // 等任務執行完畢後方可關閉
return 0;
}