執行緒池: 一種執行緒使用模式。執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可並行執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的並行處理器、處理器核心、記憶體、網路sockets等的數量
執行緒池的價值:
執行緒池中首先需要有很多個執行緒,使用者可以自己選擇建立多少個執行緒。為了實現執行緒間的同步與互斥,還需要增加兩個變數——互斥量和條件變數。我們還需要一個任務佇列,主執行緒不斷往裡面塞任務,執行緒池的執行緒不斷去處理。需要注意的是:這裡的任務佇列可以為空,但不能滿,所以任務佇列的容量不限定(實際場景中,任務佇列容量不夠就需要考慮換一臺更大的伺服器)
執行緒池的四個成員變數:
執行緒池:首先需要建立幾個執行緒,還有一個任務佇列,當任務佇列有任務的時候就喚醒一個正在等待的執行緒,讓執行緒去執行任務,執行緒池中的執行緒執行完任務不會銷燬,大大減少的cpu的消耗。
需要兩個條件變數和一個互斥鎖,這個互斥鎖用來鎖住任務佇列,因為任務佇列是公共資源,其次還需要兩個條件變數,一個條件變數用來阻塞取任務的執行緒,當佇列中有任務的時候,直接取任務,然後解鎖,當任務佇列中沒有任務的時候,解鎖等待條件,條件滿足搶鎖,取任務,解鎖。另一個條件變數用來阻塞新增者程序,當任務佇列滿了,會讓新增者程序等待,當有執行緒取走一個任務的時候,會喚醒新增者程序。
任務函數
這裡的任務函數採用的時回撥函數的方式,提高了程式碼的通用性,可以根據自己的需求改寫任務函數
//任務回撥函數
void taskRun(void *arg)
{
PoolTask *task = (PoolTask*)arg;
int num = task->tasknum;
printf("task %d is runing %lu\n",num,pthread_self());
sleep(1);
printf("task %d is done %lu\n",num,pthread_self());
}
執行緒池的主要程式碼框架
class ThreadPool
{
public:
//建構函式,初始化執行緒池
ThreadPool(int thrnum, int maxtasknum)
{
}
static void* thrRun(void* arg)
{
}
//解構函式,摧毀執行緒池
~ThreadPool()
{
}
public:
//新增任務到執行緒池
void addtask(){};
private:
//任務佇列相關的引數
int max_job_num;//最大任務個數
int job_num;//實際任務個數
PoolTask *tasks;//任務佇列陣列
int job_push;//入隊位置
int job_pop;// 出隊位置
//執行緒相關引數
int thr_num;//執行緒池內執行緒個數
pthread_t *threads;//執行緒池內執行緒陣列
int shutdown;//是否關閉執行緒池
pthread_mutex_t pool_lock;//執行緒池的鎖
pthread_cond_t empty_task;//任務佇列為空的條件
pthread_cond_t not_empty_task;//任務佇列不為空的條件
};
放任務: 主執行緒無腦往任務佇列中塞任務,塞任務之前進行加鎖,塞完任務解鎖,如果任務佇列已經滿了,等待執行緒取任務,然後喚醒在條件變數下等待的佇列;放入了任務就給執行緒傳送訊號,喚醒執行緒來取
取任務: 執行緒池中的執行緒從任務佇列中取任務,需要對任務佇列上鎖,因為對公共資源的操作都需要上鎖,如果沒有任務就阻塞,等待放任務喚醒;如果取完了一個任務,就喚醒新增任務
這就是兩個條件變數和一個互斥鎖的用法
//新增任務到執行緒池
void addtask()
{
pthread_mutex_lock(&(this->pool_lock));
//實際任務總數大於最大任務個數則阻塞等待(等待任務被處理)
while(this->max_job_num <= this->job_num)
{
pthread_cond_wait(&(this->empty_task),&(this->pool_lock));
}
int taskpos = (this->job_push++)%this->max_job_num;
this->tasks[taskpos].tasknum = beginnum++;
this->tasks[taskpos].arg = (void*)&this->tasks[taskpos];
this->tasks[taskpos].task_func = taskRun;
this->job_num++;
pthread_mutex_unlock(&(this->pool_lock));
pthread_cond_signal(&(this->not_empty_task));//通知包身工
}
//取任務
static void* thrRun(void* arg)
{
ThreadPool *pool = (ThreadPool*)arg;
int taskpos = 0;//任務位置
PoolTask *task = new PoolTask();
while(1)
{
//獲取任務,先要嘗試加鎖
pthread_mutex_lock(&pool->pool_lock);
//無任務並且執行緒池不是要摧毀
while(pool->job_num <= 0 && !pool->shutdown )
{
//如果沒有任務,執行緒會阻塞
pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock);
}
if(pool->job_num)
{
//有任務需要處理
taskpos = (pool->job_pop++)%pool->max_job_num;
//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
//為什麼要拷貝?避免任務被修改,生產者會新增任務
memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask));
task->arg = task;
pool->job_num--;
//task = &thrPool->tasks[taskpos];
pthread_cond_signal(&pool->empty_task);//通知生產者
}
if(pool->shutdown)
{
//代表要摧毀執行緒池,此時執行緒退出即可
//pthread_detach(pthread_self());//臨死前分家
pthread_mutex_unlock(&pool->pool_lock);
delete(task);
pthread_exit(NULL);
}
//釋放鎖
pthread_mutex_unlock(&pool->pool_lock);
task->task_func(task->arg);//執行回撥函數
}
}
整體程式碼:
#include<iostream>
#include<string.h>
#include<pthread.h>
#include<sys/types.h>
#include<stdio.h>
#include<unistd.h>
using namespace std;
int beginnum = 1;
class PoolTask
{
public:
int tasknum;//模擬任務編號
void *arg;//回撥函數引數
void (*task_func)(void *arg);//任務的回撥函數
};
//任務回撥函數
void taskRun(void *arg)
{
PoolTask *task = (PoolTask*)arg;
int num = task->tasknum;
printf("task %d is runing %lu\n",num,pthread_self());
sleep(1);
printf("task %d is done %lu\n",num,pthread_self());
}
class ThreadPool
{
public:
//建構函式,初始化執行緒池
ThreadPool(int thrnum, int maxtasknum)
{
this->thr_num = thrnum;
this->max_job_num = maxtasknum;
this->shutdown = 0;//是否摧毀執行緒池,1代表摧毀
this->job_push = 0;//任務佇列新增的位置
this->job_pop = 0;//任務佇列出隊的位置
this->job_num = 0;//初始化的任務個數為0
//申請最大的任務佇列
this->tasks = new PoolTask[thrnum];
//初始化鎖和條件變數
pthread_mutex_init(&(this->pool_lock),NULL);
pthread_cond_init(&(this->empty_task),NULL);
pthread_cond_init(&(this->not_empty_task),NULL);
int i = 0;
this->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申請n個執行緒id的空間
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for(i = 0;i < thrnum;i++)
{
pthread_create(&(this->threads[i]),&attr,thrRun,this);//建立多個執行緒
}
}
static void* thrRun(void* arg)
{
ThreadPool *pool = (ThreadPool*)arg;
int taskpos = 0;//任務位置
PoolTask *task = new PoolTask();
while(1)
{
//獲取任務,先要嘗試加鎖
pthread_mutex_lock(&pool->pool_lock);
//無任務並且執行緒池不是要摧毀
while(pool->job_num <= 0 && !pool->shutdown )
{
//如果沒有任務,執行緒會阻塞
pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock);
}
if(pool->job_num)
{
//有任務需要處理
taskpos = (pool->job_pop++)%pool->max_job_num;
//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
//為什麼要拷貝?避免任務被修改,生產者會新增任務
memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask));
task->arg = task;
pool->job_num--;
//task = &thrPool->tasks[taskpos];
pthread_cond_signal(&pool->empty_task);//通知生產者
}
if(pool->shutdown)
{
//代表要摧毀執行緒池,此時執行緒退出即可
//pthread_detach(pthread_self());//臨死前分家
pthread_mutex_unlock(&pool->pool_lock);
delete(task);
pthread_exit(NULL);
}
//釋放鎖
pthread_mutex_unlock(&pool->pool_lock);
task->task_func(task->arg);//執行回撥函數
}
}
//解構函式,摧毀執行緒池
~ThreadPool()
{
this->shutdown = 1;//關閉執行緒池
pthread_cond_broadcast(&(this->not_empty_task));//誘殺
int i = 0;
for(i = 0; i<this->thr_num ; i++)
{
pthread_join(this->threads[i],NULL);
}
pthread_cond_destroy(&(this->not_empty_task));
pthread_cond_destroy(&(this->empty_task));
pthread_mutex_destroy(&(this->pool_lock));
delete []tasks;
tasks = NULL;
free(this->threads);
}
public:
//新增任務到執行緒池
void addtask()
{
pthread_mutex_lock(&(this->pool_lock));
cout << "當前任務佇列中任務的個數是: " <<this-> job_num <<endl;
//實際任務總數大於最大任務個數則阻塞等待(等待任務被處理)
while(this->max_job_num <= this->job_num)
{
pthread_cond_wait(&(this->empty_task),&(this->pool_lock));
}
int taskpos = (this->job_push++)%this->max_job_num;
this->tasks[taskpos].tasknum = beginnum++;
this->tasks[taskpos].arg = (void*)&this->tasks[taskpos];
this->tasks[taskpos].task_func = taskRun;
this->job_num++;
pthread_mutex_unlock(&(this->pool_lock));
pthread_cond_signal(&(this->not_empty_task));//通知包身工
}
private:
//任務佇列相關的引數
int max_job_num;//最大任務個數
int job_num;//實際任務個數
PoolTask *tasks;//任務佇列陣列
int job_push;//入隊位置
int job_pop;// 出隊位置
//執行緒相關引數
int thr_num;//執行緒池內執行緒個數
pthread_t *threads;//執行緒池內執行緒陣列
int shutdown;//是否關閉執行緒池
pthread_mutex_t pool_lock;//執行緒池的鎖
pthread_cond_t empty_task;//任務佇列為空的條件
pthread_cond_t not_empty_task;//任務佇列不為空的條件
};
int main()
{
ThreadPool *m = new ThreadPool(3,20);
int j = 0;
for(j=0;j<20;j++)
{
m->addtask();
}
sleep(20);
delete m;
m = NULL;
system("pause");
return EXIT_SUCCESS;
}
執行結果如下:
可以看到執行緒最多處理三個任務,而任務佇列中最多可以存在20個任務,當執行緒取走了任務之後,喚醒生產者繼續新增任務。
首先封裝一個任務:
class Task
{
public:
Task(int a = 0, int b = 0)
:_a(a)
,_b(b)
{}
void Run()
{
//執行的任務可以自己編寫
}
private:
int _a;
int _b;
};
執行緒池的主要程式碼框架(喚醒和等待操作都已經封裝好):
#define DEFAULT_MAX_PTHREAD 5
class ThreadPool
{
public:
ThreadPool(int max_pthread = DEFAULT_MAX_PTHREAD)
:_max_thread(max_pthread)
{}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
public:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadWait()
{
pthread_cond_wait(&_cond, &_mutex);
}
void WakeUpThread()
{
pthread_cond_signal(&_cond);
//pthread_cond_broadcast(&_cond);
}
bool IsEmpty()
{
return _q.empty();
}
private:
queue<Task*> _q;
int _max_thread;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
建立多個執行緒
建立多個執行緒可以用一個迴圈進行建立。需要注意的是,建立一個執行緒還需要提供一個執行緒啟動後要執行的函數,這個啟動函數只能有一個引數。如果把這個函數設定為成員函數,那麼這個函數的第一個引數預設是this指標,這樣顯然是不可行的,所以這裡我們考慮把這個啟動函數設定為靜態的。但是設定為靜態的成員函數又會面臨一個問題:如何呼叫其他成員函數和成員變數? 所以這裡我們考慮建立執行緒的時候,把this指標傳過去,讓啟動函數的arg 引數去接收即可
static void* Runtine(void* arg)
{
pthread_detach(pthread_self());
ThreadPool* this_p = (ThreadPool*)arg;
while (1){
this_p->LockQueue();
while (this_p->IsEmpty()){
this_p->ThreadWait();
}
Task* t;
this_p->Get(t);
this_p->UnlockQueue();
// 解鎖後處理任務
t->Run();
delete t;
}
}
void ThreadPoolInit()
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
pthread_t t[_max_thread];
for(int i = 0; i < _max_thread; ++i)
{
pthread_create(t + i, nullptr, Runtine, this);
}
}
注意: 執行緒建立後,執行啟動函數,在這個函數中,執行緒會去任務佇列中取任務並處理,取任務前需要進行加鎖的操作(如果佇列為空需要掛起等待),取完任務然後進行解鎖,然後處理任務,讓其它執行緒去任務佇列中取任務
放任務: 主執行緒無腦往任務佇列中塞任務,塞任務之前進行加鎖,塞完任務解鎖,然後喚醒在條件變數下等待的佇列
取任務: 執行緒池中的執行緒從任務佇列中取任務,這裡不需要加鎖,因為這個動作在啟動函數中加鎖的那一段區間中被呼叫的,其實已經上鎖了
// 放任務
void Put(Task* data)
{
LockQueue();
_q.push(data);
UnlockQueue();
WakeUpThread();
}
// 取任務
void Get(Task*& data)
{
data = _q.front();
_q.pop();
}
這兩個版本都可以實現簡易的執行緒池,下面執行緒池版本的伺服器主要是用版本二來實現,因為版本一要修改的內容有點多,小夥伴們可以自己修改一下
多執行緒版本效果看起來還不錯,但是來一個連線就建立一個執行緒,斷開一個連線就釋放一個執行緒,這樣頻繁地建立和釋放執行緒資源,對OS來說是一種負擔,同時也帶來資源的浪費,如果我們使用執行緒池,把每一個使用者端連線封裝成一個任務,讓執行緒池去處理,這樣就不需要頻繁地建立和銷燬消除,效率也能提升很多。
執行緒池採用版本二,程式碼如下:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "Task.hpp"
#define DEFAULT_MAX_PTHREAD 5
class ThreadPool
{
public:
ThreadPool(int max_pthread = DEFAULT_MAX_PTHREAD)
:_max_thread(max_pthread)
{}
static void* Runtine(void* arg)
{
pthread_detach(pthread_self());
ThreadPool* this_p = (ThreadPool*)arg;
while (1){
this_p->LockQueue();
while (this_p->IsEmpty()){
this_p->ThreadWait();
}
Task* t;
this_p->Get(t);
this_p->UnlockQueue();
// 解鎖後處理任務
t->Run();
delete t;
}
}
void ThreadPoolInit()
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
pthread_t t[_max_thread];
for(int i = 0; i < _max_thread; ++i)
{
pthread_create(t + i, nullptr, Runtine, this);
}
}
void Put(Task* data)
{
LockQueue();
_q.push(data);
UnlockQueue();
WakeUpThread();
}
void Get(Task*& data)
{
data = _q.front();
_q.pop();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
public:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadWait()
{
pthread_cond_wait(&_cond, &_mutex);
}
void WakeUpThread()
{
pthread_cond_signal(&_cond);
//pthread_cond_broadcast(&_cond);
}
bool IsEmpty()
{
return _q.empty();
}
private:
std::queue<Task*> _q;
int _max_thread;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
這裡我們單獨寫一個標頭檔案——Task.hpp,其中有任務類,任務類裡面有三個成員變數,也就是埠號,IP和通訊端,其中有一個成員方法——Run,裡面封裝了一個Service函數,也就是前面寫的,把它放在Task.hpp這個標頭檔案下,執行緒池裡面的執行緒執行run函數即可,標頭檔案內容如下:
#pragma once
#include <iostream>
#include <unistd.h>
static void Service(std::string ip, int port, int sock)
{
while (1){
char buf[256];
ssize_t size = read(sock, buf, sizeof(buf)-1);
if (size > 0){
// 正常讀取size位元組的資料
buf[size] = 0;
std::cout << "[" << ip << "]:[" << port << "]# "<< buf << std::endl;
std::string msg = "server get!-> ";
msg += buf;
write(sock, msg.c_str(), msg.size());
}
else if (size == 0){
// 對端關閉
std::cout << "[" << ip << "]:[" << port << "]# close" << std::endl;
break;
}
else{
// 出錯
std::cerr << sock << "read error" << std::endl;
break;
}
}
close(sock);
std::cout << "service done" << std::endl;
}
struct Task
{
int _port;
std::string _ip;
int _sock;
Task(int port, std::string ip, int sock)
:_port(port)
,_ip(ip)
,_sock(sock)
{}
void Run()
{
Service(_ip, _port, _sock);
}
};
伺服器類的核心程式碼如下:
void loop()
{
struct sockaddr_in peer;// 獲取遠端埠號和ip資訊
socklen_t len = sizeof(peer);
_tp = new ThreadPool(THREAD_NUM);
_tp->ThreadPoolInit();
while (1){
// 獲取連結
// sock 是進行通訊的一個通訊端 _listen_sock 是進行監聽獲取連結的一個通訊端
int sock = accept(_listen_sock, (struct sockaddr*)&peer, &len);
if (sock < 0){
std::cout << "accept fail, continue accept" << std::endl;
continue;
}
int peerPort = ntohs(peer.sin_port);
std::string peerIp = inet_ntoa(peer.sin_addr);
std::cout << "get a new link, [" << peerIp << "]:[" << peerPort << "]"<< std::endl;
Task* task = new Task(peerPort, peerIp, sock);
_tp->Put(task);
}
}
注意幾點變化: