Linux--多執行緒(三)

2022-11-02 06:01:31

生產者消費者模型

概念: 生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過一個來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞佇列就是用來給生產者和消費者解耦的。

生產消費者模型優點:

  • 解耦:生產者和消費者是通過一個共用資料區域來進行通訊。而不是直接進行通訊,這樣兩個角色之間的依耐性就降低了(程式碼層面實現解耦),變成了角色與共用資料區域之間的弱耦合,一個邏輯出錯不影響兩一個邏輯,二者變得更獨立。
  • 支援並行:生產者負責生產資料,消費者負責拿資料。生產者生產完資料可以繼續生產,大部分時間內是不需要等待消費者消費資料才繼續生產。也就是說,在任一時刻,二者都是在正常處理任務的,進度都得以推進。
  • 支援忙閒下不均:生產者生產了資料是放進容器中,消費者不必立即消費,可以慢慢地從容器中取資料。容器快要空了,消費者的消費速度就可以降下來,讓生產者繼續生產。

生產消費模型特徵(簡記321):

  1. 3種關係: 生產者與生產者(互斥)、生產者與消費者(同步(主要)和互斥)和消費者與消費者(互斥)
  2. 兩個角色: 生產者和消費者
  3. 一個交易場所: 容器、共用資源等

基於阻塞佇列的生產者消費者模型

阻塞佇列的特點:

  • 當佇列為空時,從佇列獲取元素的操作將會被阻塞,直到佇列中被放入了元素
  • 當佇列滿時,往佇列裡存放元素的操作也會被阻塞,直到有元素被從佇列中取出(以上的操作都是基於不同的執行緒來說的,執行緒在對阻塞佇列程序操作時會被阻塞)
  • 在這個模型中,生產者和消費者的交易場所就是阻塞佇列

實現

概述:

因為交易場所是一個阻塞佇列,所以,我封裝了一個BlcokingQueue 的類,這裡提供了放資料和取資料這樣兩個主要的方法。其中,有五個成員變數:

  • 佇列: 使用STL中的queue來實現
  • 容量: 阻塞佇列的容量,由使用者給定,我們也可以提供一個預設的容量
  • 互斥量: 為了實現生產者和消費者的同步,我們需要使用條件變數和互斥量來實現同步的操作
  • 生產者喚醒和等待的條件變數: 當佇列滿了,生產者等待條件滿足,應該掛起等待,等待消費者喚醒
  • 消費者喚醒和等待的條件變數: 當佇列為空,消費者等待條件滿足,應該掛起等待,等待生產者喚醒

建構函式和解構函式:

  • 建構函式做初始化和資源分配的操作,分配鎖資源和條件變數

  • 解構函式做清理資源的操作,對鎖和條件變數進行銷燬

佇列類的整體架構

#include<iostream>
#include<string.h>
using namespace std;//標準名稱空間
//類別範本
template<class T>
class BlockQueue
{
public:
    //建構函式,化容量為5
    BlockQueue(int capacity = 5)
    :_capacity(capacity)
    {
      //初始化鎖和互斥量
      pthread_mutex_init(&_lock,nullptr);
      pthread_cond_init(&_c_cond,nullptr);
      pthread_cond_init(&_p_cond,nullptr);
    }
    //解構函式
    ~BlockQueue()
    {
      pthread_mutex_destroy(&_lock);
      pthread_cond_destroy(&_c_cond);
      pthread_cond_destroy(&_p_cond);
    }
private:
    //佇列容器
    queue<T> _q;
    size_t   _capacity;//佇列最大容器
    pthread_mutex_t _lock;//互斥鎖
    pthread_cond_t _c_cond;//消費者被喚醒和掛起的條件變數
    pthread_cond_t _p_cond;//生產者被喚醒和掛起的條件變數
}

基本方法的封裝

我對阻塞佇列的一些基本操作進行了封裝,有以下幾個處理動作(可以設定為私有方法):

  • 判斷佇列為空或為滿
  • 喚醒生產者和喚醒消費者
  • 生產者掛起等待和消費者掛起等待
  • 加鎖和解鎖
private:
  bool IsFull()
  {
    return _q.size() == _capacity;
  }
  bool IsEmpty()
  {
    return _q.empty();
  }
  void ConsumerWait()
  {
    cout << "consumer wait...." << endl;
    pthread_cond_wait(&_c_cond, &_lock);
  }
  void WakeUpConsumer()
  {
    cout << "wake up consumer...." << endl;
    pthread_cond_broadcast(&_c_cond);
  }
  void ProductorWait()
  {
    cout << "productor wait...." << endl;
    pthread_cond_wait(&_p_cond, &_lock);
  }
  void WakeUpProductor()
  {
    cout << "wake up productor...." << endl;
    pthread_cond_broadcast(&_p_cond);
  }
  void LockQueue()
  {
    pthread_mutex_lock(&_lock);
  }
  void UnLockQueue()
  {
    pthread_mutex_unlock(&_lock);
  }

放資料和取資料

  • 生產者進行相關操作前先上鎖,佇列如果為滿就需要掛起等待。佇列不為滿就生成一個資料,然後需要把資料放入阻塞佇列中,解開鎖之後喚醒消費者,喊消費者進行消費。
  • 消費者進行相關操作前先上鎖,佇列如果為空就需要掛起等待。佇列不為空就需要從阻塞佇列中取一個資料,然後解開鎖之後喚醒消費者,喊生產者進行生產。
void ProductData(T data)
{
  LockQueue();
  while (IsFull()){
    // 讓productor掛起等待
    ProductorWait();
  }
  // 放資料
  _q.push(data);
  UnLockQueue();
  // 喚醒consumer
  WakeUpConsumer();
}
void ConsumeData(T& data)
{
  LockQueue();
  while (IsEmpty()){
    // 讓consumer掛起等待
    ConsumerWait();
  }
  // 取資料
  data = _q.front();
  _q.pop();
  UnLockQueue();
  // 喚醒productor
  WakeUpProductor();
}

注意: 在臨界資源判斷喚醒條件是否就緒應該使用while迴圈檢測,被喚醒的執行緒並不著急立即往下執行,而是再進行一次檢測,判斷當前喚醒條件是否真的就緒了。因為喚醒執行緒的這個函數呼叫可能會發生失敗,且執行緒可能是在條件不滿足的時候被喚醒,發生誤判被偽喚醒。

封裝一個任務

我們可以實現一個任務類,生產者把這個任務放進阻塞佇列中,消費者取出並進行處理。其中還有一個run的任務執行方法

class Task
{
public:
  Task(int a = 0, int b = 0)
    :_a(a)
    ,_b(b)
  {}
  int Run()
  {
    return _a + _b;
  }
  int GetA()
  {
    return _a;
  }
  int GetB()
  {
    return _b;
  }
private:
  int _a;
  int _b;
};

單生產者和單消費者模型分析

BlockQueue<Task>* q;// 阻塞佇列

void* Consumer(void* arg)
{
  long id = (long)arg;
  while (1){
    // 消費(取)資料
    Task t(0, 0);
    q->ConsumeData(t);
    cout << "consumer " << id << " consumes a task: " << t.GetA() << " + " << t.GetB() << " = " << t.Run() << endl;
    sleep(1);// 後面可註釋,調整速度
  }
}
void* Productor(void* arg)
{
  long id = (long)arg;
  while (1){
    // 生產(放)資料
    int x = rand()%10 + 1;
    int y = rand()%10 + 1;
    Task t(x, y);
    cout << "productor " << id << " produncs a task: " << x << " + " << y << " = ?" << endl;
    q->ProductData(t);
    sleep(1);// 後面可註釋,調整速度
  }
}
int main()
{
  srand((size_t)time(nullptr));

  // 建立一個交易場所
  q =  new BlockQueue<Task>;
  
  pthread_t p, c;
  pthread_create(&p, nullptr, Productor, (void*)(1));
  pthread_create(&c, nullptr, Consumer, (void*)(1));
  
  pthread_join(p, nullptr);
  pthread_join(c, nullptr);
   
  delete q;
  return 0;
}

程式碼執行的結果分三種情況分析:

1.消費者和生產者以相同的速度執行

可以看出的是,生產者生成完一個任務,消費者就處理了一個任務。接著生產者繼續生產,消費者跟著處理,二者步調一致,一直並行的狀態

2.生產者快,消費者慢

生產者生產速度快,導致一上來生產者就把佇列塞滿了任務,接著喚醒消費者來消費,然後掛起等待。緊接著消費者處理完一個任務就喚醒生產者來生產,生產者生產了一個任務就喊消費者來消費,然後繼續掛起。可以看出的是,在這種情況下,佇列長時間是滿的,生產者大部分時間是掛起等待的。生產者和消費者開始一小部分時間內步調是不一致的,生產者生產完,消費者才消費是序列的,但是過了一會,二者步調就變得一致了,且速度是隨消費者的

3.生產者慢,消費者快

生產者的速度慢,生產者生產一個任務立馬喚醒消費者,消費者處理完一個資料,發現佇列為空,然後掛起等待,接著生產者繼續生產一個任務,然後喚醒消費者。可以看出的是,佇列大部分時間是為空的,消費者大部分時間是處於掛起等待的,二者步調一直是一致的,且執行速度是隨生產者的,也是並行的

多生產者和多消費者模型分析

做到幾點:

  • 生產者之間需要互斥,也就是生產者和生產者之間需要組內競爭一把鎖,消費者也是如此
  • 生產者和消費者之間用互斥量和條件變數做到同步和互斥(上面就做到了)
#define P_COUNT 3
#define C_COUNT 3

BlockQueue<Task>* q;

pthread_mutex_t c_lock;// 消費者的鎖
pthread_mutex_t p_lock;// 生產者的鎖

void* Consumer(void* arg)
{
  long id = (long)arg;

  while (1){
    pthread_mutex_lock(&c_lock);
    // 消費(取)資料
    Task t(0, 0);
    q->ConsumeData(t);
    cout << "consumer " << id << " consumes a task: " << t.GetA() << " + " << t.GetB() << " = " << t.Run() << endl;
    pthread_mutex_unlock(&c_lock);
    sleep(1);
  }
}

void* Productor(void* arg)
{
  long id = (long)arg;
  while (1){
    pthread_mutex_lock(&p_lock);
    // 生產(放)資料
    int x = rand()%10 + 1;
    int y = rand()%10 + 1;
    Task t(x, y);
    cout << "productor " << id << " produncs a task: " << x << " + " << y << " = ?" << endl;
    q->ProductData(t);
    pthread_mutex_unlock(&p_lock);
    sleep(1);
  }
}
int main()
{
  srand((size_t)time(nullptr));
  pthread_mutex_init(&c_lock, nullptr);
  pthread_mutex_init(&p_lock, nullptr);

  // 建立一個交易場所
  q =  new BlockQueue<Task>;

  pthread_t p[P_COUNT];
  pthread_t c[C_COUNT];

  for (long i = 0; i < P_COUNT; ++i)
  {
    pthread_create(p+i, nullptr, Productor, (void*)(i+1));
  }
  for (long i = 0; i < C_COUNT; ++i)
  {
    pthread_create(c+i, nullptr, Consumer, (void*)(i+1));
  }
  
  for (int i = 0; i < C_COUNT; ++i)
  {
    pthread_join(c[i], nullptr);
  }
  for (int i = 0; i < P_COUNT; ++i)
  {
    pthread_join(p[i], nullptr);
  }
  
  pthread_mutex_destroy(&c_lock);
  pthread_mutex_destroy(&p_lock);
  
  delete q;
  return 0;
}

注意:

  • 生產者之間需要一個互斥量,消費者之間也需要一個互斥量
  • 生產者和消費者的個數可以自己調整宏變數

執行結果如下:

POSIX號誌

介紹

POSIX號誌: 該號誌允許程序和執行緒同步對共用資源的存取。同時也可以用於實現執行緒間同步。

總結幾點:

  • 是什麼? 號誌本質是一個計數器,描述臨界資源的有效個數。申請一個資源就對號誌減1(P操作),釋放一個資源就對號誌加1(V操作)
  • 為什麼? 臨界資源可以看成很多份,互相不衝突且高效
  • 怎麼用? 可以使用號誌的相關介面,來申請號誌和釋放號誌(下面詳細介紹)

相關介面的介紹

下面要介紹的POSIX號誌相關介面都是在semaphore.h的標頭檔案中。號誌是一個型別為sem_t的變數

  • 初始化號誌
int sem_init(sem_t *sem, int pshared, unsigned int value); 
功能:
	建立一個號誌並初始化它的值。一個無名號誌在被使用前必須先初始化
引數:
	sem:號誌的地址
	pshared:等於0,號誌線上程間共用(常用)。不等於0,號誌在程序間共用
	value:號誌的初始值
返回值:
	成功:0
	失敗:-1
	
號誌的初始值就是可用資源的個數
  • 銷燬號誌
int sem_destroy(sem_t *sem);
功能:
	刪除sem標識的號誌
引數:
	sem:信量地址
返回值:
	成功:0
	失敗:-1
  • 號誌P操作(減一)
int sem_wait(sem_t *sem);
功能:
	將號誌的值減1。操作前,先檢查號誌(sem)的值是否為0,若號誌為0,此函數會阻塞,直到號誌大於0時才進行減一操作
引數:
	sem:號誌的地址
返回值:
	成功:0
	失敗:-1

int sem_trywait(sem_t *sem);
	以非阻塞的方式來對號誌進行減1操作
	若操作前,號誌的值等於0,則對號誌的操作失敗,函數立即返回
int sem_timedwait(sem_t* sem,const struct timespec *abs_timeout);
	限時嘗試將號誌的值減1
	abs_timeout:絕對時間
  • 號誌V操作(加一)
int sem_post(sem_t *sem);
功能:
	將號誌的值加1並行出訊號喚醒等待執行緒(sem_wait())
引數:
	sem:號誌的地址。
返回值:
成功:0
  • 獲取號誌的值
int sem_getvalue(sem_t *sem,int *Sval);
功能:
	獲取sem標識的號誌的值,儲存在sval中
引數:
	sem:號誌地址
	sval:儲存訊號值的地址
返回值:
	成功:0
	失敗:-1

舉個簡單的例子:

初始化號誌的資源數是1,意思就是系統中只有一份資源,執行緒1如果要申請這份資源,首先要進行p操作,sem_wait()函數,那麼資源數-1,此時資源數是0,說明系統中沒有可用的資源,其他執行緒在進行p操作的時候就會阻塞,執行緒1執行完畢,執行v操作,資源數+1,系統中的資源被釋放出來,執行緒2此時解除阻塞狀態,步驟和執行緒1一樣。

程式碼範例:

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
sem_t sem;
void* run1(void* arg)
{
  while (1){
    sem_wait(&sem);
    cout << "run1 is running..." << endl;
    sem_post(&sem);
    sleep(1);
  }
}
void* run2(void* arg)
{
  while (1){
    sem_wait(&sem);
    cout << "run2 is running..." << endl;
    sem_post(&sem);
    sleep(1);
  }
}
int main()
{
  sem_init(&sem, 0, 1);
  pthread_t t1, t2;
  pthread_create(&t1, nullptr, run1, nullptr);
  pthread_create(&t2, nullptr, run2, nullptr);

  sem_destroy(&sem);
  pthread_join(t1, nullptr);
  pthread_join(t2, nullptr);
  return 0;
}

執行結果如下:

注意:當號誌的初始值為1的時候,就相當於一把互斥鎖

基於環形佇列的生產消費模型

環形佇列介紹

環形佇列: 環形佇列和普通佇列的區別就是,這種佇列是一種環形的結構,有一個頭指標和一個尾指標維護環中一小段佇列。

環形結構起始狀態和結束狀態都是一樣的,不好判斷為空或者為滿,所以可以通過加計數器或者標記位來判斷滿或者空。另外也可以預留一個空的位置,作為滿的狀態。

這就類似於資料結構中學的迴圈佇列類似。

因為號誌就是一個計數器,所以我們可以通過號誌來實現多執行緒間的同步過程。

實現

一個交易場所: 迴圈佇列
兩個角色

  • 生產者:需要申請空間資源(P操作),然後釋放資料資源(V操作)
  • 消費者:需要申請資料資源(P操作),然後釋放空間資源(V操作)
  • 三種關係: 生產者與生產者(互斥)、生產者與消費者(同步(主要)和互斥)和消費者與消費者(互斥)

幾個變數成員

  • 佇列:陣列模擬
  • 容量:由使用者給定
  • 空間資源號誌:佇列的容量大小
  • 資料資源號誌:開始為0
  • 生產者的下標位置:開始為0
  • 消費者的下標位置:開始為0

程式碼範例:

#include<iostream>
#include<string.h>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
#include<unistd.h>
using namespace std;
//生產者:需要申請空間資源(P操作),然後釋放資料資源(V操作)
//消費者:需要申請空間資源(P操作),然後釋放空間資源(V操作)
template<class T>
class RingQueue
{
public:
    RingQueue(int capacity = 5):_capacity(capacity),_rq(capacity),_c_index(0),_p_index(0)
    {
      //初始化空間資源號誌,容量為5
      sem_init(&_blank_sem,0,_capacity);
      sem_init(&_data_sem,0,0);
    }
    ~RingQueue()
    {
      sem_destroy(&_blank_sem);
      sem_destroy(&_data_sem);
    }
private:
    void P(sem_t& sem)
{
      sem_wait(&sem);
    }
    void V(sem_t& sem)
    {
      sem_post(&sem);
    }
public:
  void GetData(T& data)
  {
    // consumer申請資料資源
    P(_data_sem);
    data = _rq[_c_index];
    _c_index = (_c_index + 1) % _capacity;
    // consumer釋放格子資源
    V(_blank_sem);
  }
  void PutData(const T& data)
  {
    // productor申請格子資源
    P(_blank_sem);
    _rq[_p_index] = data;
    _p_index = (_p_index + 1) % _capacity;
    // productor釋放資料資源
V(_data_sem);
  }

private:
    //vector容器模擬佇列
    vector<T> _rq;
    //佇列的容量
    size_t _capacity;
    //空間資源號誌
    sem_t _blank_sem;
    //資料資源號誌
    sem_t _data_sem;
    //消費者的下標位置:開始為0
    int _c_index;
    //生產者的下標位置:開始為0
    int _p_index;
};
RingQueue<int> *q ;
void* Consumer(void* arg)
  {
    long id = (long)arg;
    while (1){
      // 消費(取)資料
      int x;
      q->GetData(x);
      cout << "consumer " << id << " consumes a data: " << x << endl;
      sleep(1);// 後面可註釋,調整速度
   }
}
 
void* Productor(void* arg)
{
    long id = (long)arg;
    while (1){
      // 生產(放)資料
      int x = rand()%10 + 1;
      q->PutData(x);
      cout << "productor " << id << " produncs a data: " << x<< endl;
      sleep(1);// 後面可註釋,調整速度
    }
  }
int main()
{
	// 建立一個交易場所
    q = new RingQueue<int>();
    srand((size_t)time(nullptr));
  
    pthread_t p, c;
    pthread_create(&p, nullptr, Productor, (void*)(1));
    pthread_create(&c, nullptr, Consumer, (void*)(1));
 
    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
 
    delete q;
    return EXIT_SUCCESS;
}

注意:生產者生成資料前需要申請空間資源號誌(P(_blank_sem)),申請不成功就掛起等待,等待號誌來了繼續獲得號誌,然後釋放資料資源號誌(V(_data_sem))
消費者消費資料前需要申請資料資源號誌(P(_data_sem)),申請不成功就掛起等待,等待號誌來了繼續獲得號誌,然後釋放空間資源號誌(V(_blank_sem))

執行結果如下:

  • 生產者和消費者執行速度一致

生產者生產完一個資料,然後消費者就消費了,二者步調一致,並行執行。

  • 生產者快,消費者慢

生產者速度快,一下字就把佇列塞滿了資料(開始時二者步調不一致),接著生產者如果再去申請空間號誌,此時已經申請不到了,只能掛起等待,消費者消費資料是否空間號誌,這是生產者才可以繼續生產,可以看出,在後面大部分時間,二者步調恢復一致了,且速度隨消費者。

  • 生產者慢,消費者快

生產生產者生產完一個資料,資料號誌加1,空間號誌減1,然後消費者裡馬消費了一個資料,資料號誌減1,空間號誌加1,此時資料號誌為0,消費者再去申請資料號誌,申請不到就掛起等待,只能等生產者在去生產釋放空間號誌,然後消費者才可以申請到。可以看出的是,佇列長時間是空的,二者步調一致,速度隨生產者。