詳解PHP訊息佇列的實現以及運用(附流程圖)

2022-10-27 18:00:48

訊息佇列的概念、原理、實現方式

概念

  • 佇列結構的一箇中介軟體
  • 不需要立即消費訊息
  • 由消費者或者訂閱者進行按順序消費

基本的流程圖如下所示

  • 流程
    ca434f67a2b74693d8a8db235ff9d3d.jpg

應用場景

  • 冗餘
  • 解耦
  • 流量削峰
  • 非同步通訊

實現方式

  • mysql:可靠、速度慢
  • redis:速度快,對於大訊息包處理較慢
  • 訊息系統:可靠、專業性強

訊息的觸發機制

  • 死迴圈的方式,故障時無法及時恢復
  • 定時任務:壓力均分、但是處理量有上限
  • 守護行程的方式

解耦 (訂單和配送系統)

  • 架構設計1 採用定時任務的方式
    1013f8bafa2312f0f60c2f8253eb0f2.jpg

    php入門到就業線上直播課:進入學習
    Apipost = Postman + Swagger + Mock + Jmeter 超好用的API偵錯工具:

  • 使用配送處理系統進行處理時,將當前資料庫裡需要處理的訂單狀態更新為2,待處理完成後將狀態設為1

  • 可以每次指定更新多少條資料

流量削鋒 (redis實現秒殺)

  • 使用佇列的資料結構

    • lpush/rpush 將資料放入列表中
    • lpop/rpop 將資料移除列表並獲取到移除的值
    • ltrim 保留指定區間內的元素
    • llen 獲取列表長度
    • lset 通過索引設定列表的值
    • lindex 通過索引獲取列表中的值
    • lrange 獲取指定範圍的元素
  • 圖示如下
    03e20c9fdf5009f0ad7383988c42c6f.jpg

  • 程式碼流程如下

    • 秒殺程式將請求寫入redis(uid,time)

    • 檢查redis列表存放的長度,超過10個直接捨棄

    • 通過死迴圈讀取redis資料,並存入資料庫

      // Spike.php 秒殺程式if(Redis::llen('lottery') < 10){
          // 成功
          Redis::lpush('lottery', $uid.'%'.microtime());}else{
          // 失敗}
      登入後複製
      // Warehousing.php 入庫程式while(true){
          $user = Redis::rpop('lottery');
          if (!$user || $user == 'nil') {
              sleep(2);
              continue;
          }
          $user_arr = explode($user, '%');
          $insert_user = [
              'uid' => $user_arr[0],
              'time' => $user_arr[1]
          ];
          $res = DB::table('lottery_queue')->insert($insert_user);
          if (!$res) {
              Redis::lpush('lottery', $user);
          }}
      登入後複製
  • 上述程式碼中假如並行過大的話會存在超賣的情況,此時可以使用檔案鎖或者redis分散式鎖進行控制,先將商品放入redis list中 使用rpop進行取出,如果取不到則說明已經賣完

  • 具體的思路及虛擬碼如下

      // 先將商品放入redis中
      $goods_id = 2;
    
      $sql = select id,num from goods where id = $goods_id;
      $res = DB::select($sql);
      if (!empty($res)) {
          // 也可以指定多少件
          Redis::del('lottery_goods' . $goods_id);
          for($i=0;$i<$res['num'];$i++){
              Redis::lpush('lottery_goods . $goods_id', $i);
          }
          LOG::info('商品存入佇列成功,數量:' . Redis::llen('lottery_goods . $goods_id'));
      } else {
          LOG::info($goods_id . '加入失敗');
      }
    登入後複製
      // 開始秒殺
      $count = Redis::rpop('lottery_goods' . $goods_id);
      if (!$count) {
          // 商品已搶完
          ...
      }
    
      // 使用者搶購佇列
      $user_list = 'user_goods_id_' . $goods_id;
      $user_status = Redis::sismember($user_list, $user_id);
      if ($user_status) {
          // 已搶過
          ...
      }
    
      // 將搶到的放到列表中
      Redis::sadd($user_list, $uid);
      $msg = '使用者:' . $uid . '順序' . $count;
      Log::info($msg);
      // 生成訂單等
      ...
      // 減庫存
      $sql = update goods set num = num -1 where id = $goods_id and num > 0; // 防止超賣
      DB::update($sql)
      // 搶購成功
    登入後複製

rabbitmq

  • 架構及原理
    1fefd7af11b134a7ce893e66a6bd505.jpg
    其中P代表生產者,X為交換機(channal),C代表消費者

  • 簡單使用

      // Send.php
      require_once __DIR__.'/vendor/autoload.php';
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
      // 建立通道
      $channel = $connection->channel();
      // 宣告一個佇列
      $channel->queue_declare('user_email', false, false, false, false);
      // 製作訊息
      $msg = new AMQPMessage('send email');
      // 將訊息推播到佇列
      $channel->basic_publish($msg, '', 'user_email');
    
      echo '[x] send email';
    
      $channel->close();
      $connection->close();
    登入後複製
      // Receive.php
      require_once __DIR__.'/vendor/autoload.php';
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
      //建立通道
      $channel = $connection->channel();
    
      $channel->queue_declare('user_email', false, false, false, false);
    
      // 當收到訊息時的回撥函數
      $callback = function($msg){
          //傳送郵件
          echo 'Received '.$msg->body.'\n';
      };
    
      $channel->basic_consume('user_email', '', false, true, false, false, $callback);
    
      // 保持監聽狀態
      while($channel->is_open()){
          $channel->wait();
      }
    登入後複製

以上就是詳解PHP訊息佇列的實現以及運用(附流程圖)的詳細內容,更多請關注TW511.COM其它相關文章!