概念: 生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過一個來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞佇列就是用來給生產者和消費者解耦的。
生產消費者模型優點:
生產消費模型特徵(簡記321):
阻塞佇列的特點:
因為交易場所是一個阻塞佇列,所以,我封裝了一個BlcokingQueue 的類,這裡提供了放資料和取資料這樣兩個主要的方法。其中,有五個成員變數:
建構函式和解構函式:
建構函式做初始化和資源分配的操作,分配鎖資源和條件變數
解構函式做清理資源的操作,對鎖和條件變數進行銷燬
#include<iostream>
#include<string.h>
using namespace std;//標準名稱空間
//類別範本
template<class T>
class BlockQueue
{
public:
//建構函式,化容量為5
BlockQueue(int capacity = 5)
:_capacity(capacity)
{
//初始化鎖和互斥量
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_c_cond,nullptr);
pthread_cond_init(&_p_cond,nullptr);
}
//解構函式
~BlockQueue()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
private:
//佇列容器
queue<T> _q;
size_t _capacity;//佇列最大容器
pthread_mutex_t _lock;//互斥鎖
pthread_cond_t _c_cond;//消費者被喚醒和掛起的條件變數
pthread_cond_t _p_cond;//生產者被喚醒和掛起的條件變數
}
我對阻塞佇列的一些基本操作進行了封裝,有以下幾個處理動作(可以設定為私有方法):
private:
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.empty();
}
void ConsumerWait()
{
cout << "consumer wait...." << endl;
pthread_cond_wait(&_c_cond, &_lock);
}
void WakeUpConsumer()
{
cout << "wake up consumer...." << endl;
pthread_cond_broadcast(&_c_cond);
}
void ProductorWait()
{
cout << "productor wait...." << endl;
pthread_cond_wait(&_p_cond, &_lock);
}
void WakeUpProductor()
{
cout << "wake up productor...." << endl;
pthread_cond_broadcast(&_p_cond);
}
void LockQueue()
{
pthread_mutex_lock(&_lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_lock);
}
void ProductData(T data)
{
LockQueue();
while (IsFull()){
// 讓productor掛起等待
ProductorWait();
}
// 放資料
_q.push(data);
UnLockQueue();
// 喚醒consumer
WakeUpConsumer();
}
void ConsumeData(T& data)
{
LockQueue();
while (IsEmpty()){
// 讓consumer掛起等待
ConsumerWait();
}
// 取資料
data = _q.front();
_q.pop();
UnLockQueue();
// 喚醒productor
WakeUpProductor();
}
注意: 在臨界資源判斷喚醒條件是否就緒應該使用while迴圈檢測,被喚醒的執行緒並不著急立即往下執行,而是再進行一次檢測,判斷當前喚醒條件是否真的就緒了。因為喚醒執行緒的這個函數呼叫可能會發生失敗,且執行緒可能是在條件不滿足的時候被喚醒,發生誤判被偽喚醒。
我們可以實現一個任務類,生產者把這個任務放進阻塞佇列中,消費者取出並進行處理。其中還有一個run的任務執行方法
class Task
{
public:
Task(int a = 0, int b = 0)
:_a(a)
,_b(b)
{}
int Run()
{
return _a + _b;
}
int GetA()
{
return _a;
}
int GetB()
{
return _b;
}
private:
int _a;
int _b;
};
BlockQueue<Task>* q;// 阻塞佇列
void* Consumer(void* arg)
{
long id = (long)arg;
while (1){
// 消費(取)資料
Task t(0, 0);
q->ConsumeData(t);
cout << "consumer " << id << " consumes a task: " << t.GetA() << " + " << t.GetB() << " = " << t.Run() << endl;
sleep(1);// 後面可註釋,調整速度
}
}
void* Productor(void* arg)
{
long id = (long)arg;
while (1){
// 生產(放)資料
int x = rand()%10 + 1;
int y = rand()%10 + 1;
Task t(x, y);
cout << "productor " << id << " produncs a task: " << x << " + " << y << " = ?" << endl;
q->ProductData(t);
sleep(1);// 後面可註釋,調整速度
}
}
int main()
{
srand((size_t)time(nullptr));
// 建立一個交易場所
q = new BlockQueue<Task>;
pthread_t p, c;
pthread_create(&p, nullptr, Productor, (void*)(1));
pthread_create(&c, nullptr, Consumer, (void*)(1));
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete q;
return 0;
}
程式碼執行的結果分三種情況分析:
1.消費者和生產者以相同的速度執行
可以看出的是,生產者生成完一個任務,消費者就處理了一個任務。接著生產者繼續生產,消費者跟著處理,二者步調一致,一直並行的狀態
2.生產者快,消費者慢
生產者生產速度快,導致一上來生產者就把佇列塞滿了任務,接著喚醒消費者來消費,然後掛起等待。緊接著消費者處理完一個任務就喚醒生產者來生產,生產者生產了一個任務就喊消費者來消費,然後繼續掛起。可以看出的是,在這種情況下,佇列長時間是滿的,生產者大部分時間是掛起等待的。生產者和消費者開始一小部分時間內步調是不一致的,生產者生產完,消費者才消費是序列的,但是過了一會,二者步調就變得一致了,且速度是隨消費者的
3.生產者慢,消費者快
生產者的速度慢,生產者生產一個任務立馬喚醒消費者,消費者處理完一個資料,發現佇列為空,然後掛起等待,接著生產者繼續生產一個任務,然後喚醒消費者。可以看出的是,佇列大部分時間是為空的,消費者大部分時間是處於掛起等待的,二者步調一直是一致的,且執行速度是隨生產者的,也是並行的
做到幾點:
#define P_COUNT 3
#define C_COUNT 3
BlockQueue<Task>* q;
pthread_mutex_t c_lock;// 消費者的鎖
pthread_mutex_t p_lock;// 生產者的鎖
void* Consumer(void* arg)
{
long id = (long)arg;
while (1){
pthread_mutex_lock(&c_lock);
// 消費(取)資料
Task t(0, 0);
q->ConsumeData(t);
cout << "consumer " << id << " consumes a task: " << t.GetA() << " + " << t.GetB() << " = " << t.Run() << endl;
pthread_mutex_unlock(&c_lock);
sleep(1);
}
}
void* Productor(void* arg)
{
long id = (long)arg;
while (1){
pthread_mutex_lock(&p_lock);
// 生產(放)資料
int x = rand()%10 + 1;
int y = rand()%10 + 1;
Task t(x, y);
cout << "productor " << id << " produncs a task: " << x << " + " << y << " = ?" << endl;
q->ProductData(t);
pthread_mutex_unlock(&p_lock);
sleep(1);
}
}
int main()
{
srand((size_t)time(nullptr));
pthread_mutex_init(&c_lock, nullptr);
pthread_mutex_init(&p_lock, nullptr);
// 建立一個交易場所
q = new BlockQueue<Task>;
pthread_t p[P_COUNT];
pthread_t c[C_COUNT];
for (long i = 0; i < P_COUNT; ++i)
{
pthread_create(p+i, nullptr, Productor, (void*)(i+1));
}
for (long i = 0; i < C_COUNT; ++i)
{
pthread_create(c+i, nullptr, Consumer, (void*)(i+1));
}
for (int i = 0; i < C_COUNT; ++i)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < P_COUNT; ++i)
{
pthread_join(p[i], nullptr);
}
pthread_mutex_destroy(&c_lock);
pthread_mutex_destroy(&p_lock);
delete q;
return 0;
}
注意:
執行結果如下:
POSIX號誌: 該號誌允許程序和執行緒同步對共用資源的存取。同時也可以用於實現執行緒間同步。
總結幾點:
下面要介紹的POSIX號誌相關介面都是在semaphore.h
的標頭檔案中。號誌是一個型別為sem_t
的變數
int sem_init(sem_t *sem, int pshared, unsigned int value);
功能:
建立一個號誌並初始化它的值。一個無名號誌在被使用前必須先初始化
引數:
sem:號誌的地址
pshared:等於0,號誌線上程間共用(常用)。不等於0,號誌在程序間共用
value:號誌的初始值
返回值:
成功:0
失敗:-1
號誌的初始值就是可用資源的個數
int sem_destroy(sem_t *sem);
功能:
刪除sem標識的號誌
引數:
sem:信量地址
返回值:
成功:0
失敗:-1
int sem_wait(sem_t *sem);
功能:
將號誌的值減1。操作前,先檢查號誌(sem)的值是否為0,若號誌為0,此函數會阻塞,直到號誌大於0時才進行減一操作
引數:
sem:號誌的地址
返回值:
成功:0
失敗:-1
int sem_trywait(sem_t *sem);
以非阻塞的方式來對號誌進行減1操作
若操作前,號誌的值等於0,則對號誌的操作失敗,函數立即返回
int sem_timedwait(sem_t* sem,const struct timespec *abs_timeout);
限時嘗試將號誌的值減1
abs_timeout:絕對時間
int sem_post(sem_t *sem);
功能:
將號誌的值加1並行出訊號喚醒等待執行緒(sem_wait())
引數:
sem:號誌的地址。
返回值:
成功:0
int sem_getvalue(sem_t *sem,int *Sval);
功能:
獲取sem標識的號誌的值,儲存在sval中
引數:
sem:號誌地址
sval:儲存訊號值的地址
返回值:
成功:0
失敗:-1
舉個簡單的例子:
初始化號誌的資源數是1,意思就是系統中只有一份資源,執行緒1如果要申請這份資源,首先要進行p操作,sem_wait()函數,那麼資源數-1,此時資源數是0,說明系統中沒有可用的資源,其他執行緒在進行p操作的時候就會阻塞,執行緒1執行完畢,執行v操作,資源數+1,系統中的資源被釋放出來,執行緒2此時解除阻塞狀態,步驟和執行緒1一樣。
程式碼範例:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
sem_t sem;
void* run1(void* arg)
{
while (1){
sem_wait(&sem);
cout << "run1 is running..." << endl;
sem_post(&sem);
sleep(1);
}
}
void* run2(void* arg)
{
while (1){
sem_wait(&sem);
cout << "run2 is running..." << endl;
sem_post(&sem);
sleep(1);
}
}
int main()
{
sem_init(&sem, 0, 1);
pthread_t t1, t2;
pthread_create(&t1, nullptr, run1, nullptr);
pthread_create(&t2, nullptr, run2, nullptr);
sem_destroy(&sem);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
執行結果如下:
注意:當號誌的初始值為1的時候,就相當於一把互斥鎖
環形佇列: 環形佇列和普通佇列的區別就是,這種佇列是一種環形的結構,有一個頭指標和一個尾指標維護環中一小段佇列。
環形結構起始狀態和結束狀態都是一樣的,不好判斷為空或者為滿,所以可以通過加計數器或者標記位來判斷滿或者空。另外也可以預留一個空的位置,作為滿的狀態。
這就類似於資料結構中學的迴圈佇列類似。
因為號誌就是一個計數器,所以我們可以通過號誌來實現多執行緒間的同步過程。
一個交易場所: 迴圈佇列
兩個角色:
幾個變數成員:
程式碼範例:
#include<iostream>
#include<string.h>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
#include<unistd.h>
using namespace std;
//生產者:需要申請空間資源(P操作),然後釋放資料資源(V操作)
//消費者:需要申請空間資源(P操作),然後釋放空間資源(V操作)
template<class T>
class RingQueue
{
public:
RingQueue(int capacity = 5):_capacity(capacity),_rq(capacity),_c_index(0),_p_index(0)
{
//初始化空間資源號誌,容量為5
sem_init(&_blank_sem,0,_capacity);
sem_init(&_data_sem,0,0);
}
~RingQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
}
private:
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
public:
void GetData(T& data)
{
// consumer申請資料資源
P(_data_sem);
data = _rq[_c_index];
_c_index = (_c_index + 1) % _capacity;
// consumer釋放格子資源
V(_blank_sem);
}
void PutData(const T& data)
{
// productor申請格子資源
P(_blank_sem);
_rq[_p_index] = data;
_p_index = (_p_index + 1) % _capacity;
// productor釋放資料資源
V(_data_sem);
}
private:
//vector容器模擬佇列
vector<T> _rq;
//佇列的容量
size_t _capacity;
//空間資源號誌
sem_t _blank_sem;
//資料資源號誌
sem_t _data_sem;
//消費者的下標位置:開始為0
int _c_index;
//生產者的下標位置:開始為0
int _p_index;
};
RingQueue<int> *q ;
void* Consumer(void* arg)
{
long id = (long)arg;
while (1){
// 消費(取)資料
int x;
q->GetData(x);
cout << "consumer " << id << " consumes a data: " << x << endl;
sleep(1);// 後面可註釋,調整速度
}
}
void* Productor(void* arg)
{
long id = (long)arg;
while (1){
// 生產(放)資料
int x = rand()%10 + 1;
q->PutData(x);
cout << "productor " << id << " produncs a data: " << x<< endl;
sleep(1);// 後面可註釋,調整速度
}
}
int main()
{
// 建立一個交易場所
q = new RingQueue<int>();
srand((size_t)time(nullptr));
pthread_t p, c;
pthread_create(&p, nullptr, Productor, (void*)(1));
pthread_create(&c, nullptr, Consumer, (void*)(1));
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete q;
return EXIT_SUCCESS;
}
注意:生產者生成資料前需要申請空間資源號誌(P(_blank_sem)),申請不成功就掛起等待,等待號誌來了繼續獲得號誌,然後釋放資料資源號誌(V(_data_sem))
消費者消費資料前需要申請資料資源號誌(P(_data_sem)),申請不成功就掛起等待,等待號誌來了繼續獲得號誌,然後釋放空間資源號誌(V(_blank_sem))
執行結果如下:
生產者生產完一個資料,然後消費者就消費了,二者步調一致,並行執行。
生產者速度快,一下字就把佇列塞滿了資料(開始時二者步調不一致),接著生產者如果再去申請空間號誌,此時已經申請不到了,只能掛起等待,消費者消費資料是否空間號誌,這是生產者才可以繼續生產,可以看出,在後面大部分時間,二者步調恢復一致了,且速度隨消費者。
生產生產者生產完一個資料,資料號誌加1,空間號誌減1,然後消費者裡馬消費了一個資料,資料號誌減1,空間號誌加1,此時資料號誌為0,消費者再去申請資料號誌,申請不到就掛起等待,只能等生產者在去生產釋放空間號誌,然後消費者才可以申請到。可以看出的是,佇列長時間是空的,二者步調一致,速度隨生產者。