基於 Hyperf+ WebSocket +RabbitMQ 實現的一個簡單大螢幕的訊息推播。
思路
利用 WebSocket 協定讓用戶端和伺服器端保持有狀態的長連結,
儲存連結上來的用戶端 id。訂閱發布者發布的訊息針對已儲存的用戶端 id 進行廣播訊息。
WebSocket 服務
composer require hyperf/websocket-server
組態檔 [config/autoload/server.php]
<?php return [ 'mode' => SWOOLE_PROCESS, 'servers' => [ [ 'name' => 'http', 'type' => Server::SERVER_HTTP, 'host' => '0.0.0.0', 'port' => 11111, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class, 'onRequest'], ], ], [ 'name' => 'ws', 'type' => Server::SERVER_WEBSOCKET, 'host' => '0.0.0.0', 'port' => 12222, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class, 'onHandShake'], SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class, 'onMessage'], SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class, 'onClose'], ], ], ],
WebSocket 伺服器端程式碼範例
<?php declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://doc.hyperf.io * @contact [email protected] * @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE */ namespace AppController; use HyperfContractOnCloseInterface; use HyperfContractOnMessageInterface; use HyperfContractOnOpenInterface; use SwooleHttpRequest; use SwooleServer; use SwooleWebsocketFrame; use SwooleWebSocketServer as WebSocketServer; class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface { /** * 傳送訊息 * @param WebSocketServer $server * @param Frame $frame */ public function onMessage(WebSocketServer $server, Frame $frame): void { //心跳重新整理快取 $redis = $this->container->get(Redis::class); //獲取所有的用戶端id $fdList = $redis->sMembers('websocket_sjd_1'); //如果當前用戶端在用戶端集合中,就重新整理 if (in_array($frame->fd, $fdList)) { $redis->sAdd('websocket_sjd_1', $frame->fd); $redis->expire('websocket_sjd_1', 7200); } $server->push($frame->fd, 'Recv: ' . $frame->data); } /** * 用戶端失去連結 * @param Server $server * @param int $fd * @param int $reactorId */ public function onClose(Server $server, int $fd, int $reactorId): void { //刪掉用戶端id $redis = $this->container->get(Redis::class); //移除集合中指定的value $redis->sRem('websocket_sjd_1', $fd); var_dump('closed'); } /** * 用戶端連結 * @param WebSocketServer $server * @param Request $request */ public function onOpen(WebSocketServer $server, Request $request): void { //儲存用戶端id $redis = $this->container->get(Redis::class); $res1 = $redis->sAdd('websocket_sjd_1', $request->fd); var_dump($res1); $res = $redis->expire('websocket_sjd_1', 7200); var_dump($res); $server->push($request->fd, 'Opened'); } }
WebSocket 前端程式碼
function WebSocketTest() { if ("WebSocket" in window) { console.log("您的瀏覽器支援 WebSocket!"); var num = 0 // 開啟一個 web socket var ws = new WebSocket("ws://127.0.0.1:12222"); ws.onopen = function () { // Web Socket 已連線上,使用 send() 方法傳送資料 //alert("資料傳送中..."); //ws.send("傳送資料"); }; window.setInterval(function () { //每隔5秒鐘傳送一次心跳,避免websocket連線因超時而自動斷開 var ping = {"type": "ping"}; ws.send(JSON.stringify(ping)); }, 5000); ws.onmessage = function (evt) { var d = JSON.parse(evt.data); console.log(d); if (d.code == 300) { $(".address").text(d.address) } if (d.code == 200) { var v = d.data console.log(v); num++ var str = `<div class="item"> <p>${v.recordOutTime}</p> <p>${v.userOutName}</p> <p>${v.userOutNum}</p> <p>${v.doorOutName}</p> </div>` $(".tableHead").after(str) if (num > 7) { num-- $(".table .item:nth-last-child(1)").remove() } } }; ws.error = function (e) { console.log(e) alert(e) } ws.onclose = function () { // 關閉 websocket alert("連線已關閉..."); }; } else { alert("您的瀏覽器不支援 WebSocket!"); } }
AMQP 元件
composer require hyperf/amqp
組態檔 [config/autoload/amqp.php]
<?php return [ 'default' => [ 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'pool' => [ 'min_connections' => 1, 'max_connections' => 10, 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, ], 'params' => [ 'insist' => false, 'login_method' => 'AMQPLAIN', 'login_response' => null, 'locale' => 'en_US', 'connection_timeout' => 3.0, 'read_write_timeout' => 6.0, 'context' => null, 'keepalive' => false, 'heartbeat' => 3, ], ], ];
MQ 消費者程式碼
<?php declare(strict_types=1); namespace AppAmqpConsumer; use HyperfAmqpAnnotationConsumer; use HyperfAmqpMessageConsumerMessage; use HyperfAmqpResult; use HyperfServerServer; use HyperfServerServerFactory; /** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */ class DemoConsumer extends ConsumerMessage { /** * rabbmitMQ消費端程式碼 * @param $data * @return string */ public function consume($data): string { print_r($data); //獲取集合中所有的value $redis = $this->container->get(Redis::class); $fdList=$redis->sMembers('websocket_sjd_1'); $server=$this->container->get(ServerFactory::class)->getServer()->getServer(); foreach($fdList as $key=>$v){ if(!empty($v)){ $server->push((int)$v, $data); } } return Result::ACK; } }
控制器程式碼
/** * test * @return array */ public function test() { $data = array( 'code' => 200, 'data' => [ 'userOutName' => 'ccflow', 'userOutNum' => '9999', 'recordOutTime' => date("Y-m-d H:i:s", time()), 'doorOutName' => '教師公寓', ] ); $data = GuzzleHttpjson_encode($data); $message = new DemoProducer($data); $producer = ApplicationContext::getContainer()->get(Producer::class); $result = $producer->produce($message); var_dump($result); $user = $this->request->input('user', 'Hyperf'); $method = $this->request->getMethod(); return [ 'method' => $method, 'message' => "{$user}.", ]; }
最終效果
推薦:《PHP教學》
以上就是如何基於Hyperf實現RabbitMQ+WebSocket訊息推播的詳細內容,更多請關注TW511.COM其它相關文章!