PHP實現(xiàn)RabbitMQ消息列隊的示例代碼
業(yè)務(wù)場景
項目公司是主php做開發(fā)的,框架為thinkphp。眾所周知,php本身的運行效率存在一定的缺陷,所以如果有一個很復(fù)雜很耗時的業(yè)務(wù)時,必須開發(fā)一個常駐內(nèi)存的程序。首先我想到了php的workerman與swoole,但是這里應(yīng)上面的標(biāo)題哈,想將耗時任務(wù)交給另一個服務(wù)器,同時列隊處理。所以這里我想獨立部署一個rabbitMQ服務(wù)器用于處理列隊任務(wù)。
當(dāng)rabbitMQ服務(wù)器我們準(zhǔn)備好了,建立了一個持久化命名為ceshi的列隊,如下:

項目上生產(chǎn)者和消費者的開發(fā)我這里全部采用tinkphp6+workerman,為便于管理。這里這么做也是因為發(fā)現(xiàn)workerman中對rabbitMQ的文檔解釋太少了!
所以開始踩坑!
1、首先部署好thinkphp6框架
過程去看thinkphp6手冊
2、安裝workerman擴(kuò)展
過程去看thinkphp6手冊

3、生產(chǎn)者
配置一個workerman類


創(chuàng)建的Send類代碼如下:
<?php
namespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{
//websocket地址,一會用于測試。
protected $socket = 'websocket://127.0.0.1:2345';
/**
* 收到信息
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
//websocket發(fā)送過來的消息
$connection->send('我收到你的信息了:'.$data);
//rabbitMQ配置
$options = [
'host'=>'127.0.0.1',//rabbitMQ IP
'port'=>5672,//rabbitMQ 通訊端口
'user'=>'admin',//rabbitMQ 賬號
'password'=>'123456'//rabbitMQ 密碼
];
(new Client($options))->connect()->then(function (Client $client) {
return $client->channel();
})->then(function (Channel $channel) {
/**
* 創(chuàng)建隊列(Queue)
* name: ceshi // 隊列名稱
* passive: false // 如果設(shè)置true存在則返回OK,否則就報錯。設(shè)置false存在返回OK,不存在則自動創(chuàng)建
* durable: true // 是否持久化,設(shè)置false是存放到內(nèi)存中RabbitMQ重啟后會丟失,
* 設(shè)置true則代表是一個持久的隊列,服務(wù)重啟之后也會存在,因為服務(wù)會把持久化的Queue存放在硬盤上,當(dāng)服務(wù)重啟的時候,會重新加載之前被持久化的Queue
* exclusive: false // 是否排他,指定該選項為true則隊列只對當(dāng)前連接有效,連接斷開后自動刪除
* auto_delete: false // 是否自動刪除,當(dāng)最后一個消費者斷開連接之后隊列是否自動被刪除
*/
return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
return $channel;
});
})->then(function (Channel $channel) use($data){
echo "發(fā)送消息內(nèi)容:".$data."\n";
/**
* 發(fā)送消息
* body 發(fā)送的數(shù)據(jù)
* headers 數(shù)據(jù)頭,建議 ['content_type' => 'text/plain'],這樣消費端是springboot注解接收直接是字符串類型
* exchange 交換器名稱
* routingKey 路由key
* mandatory
* immediate
* @return bool|PromiseInterface|int
*/
return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
return $channel;
});
})->then(function (Channel $channel) {
//echo " [x] Sent 'Hello World!'\n";
$client = $channel->getClient();
return $channel->close()->then(function () use ($client) {
return $client;
});
})->then(function (Client $client) {
$client->disconnect();
});
}
/**
* 當(dāng)連接建立時觸發(fā)的回調(diào)函數(shù)
* @param $connection
*/
public function onConnect($connection)
{
}
/**
* 當(dāng)連接斷開時觸發(fā)的回調(diào)函數(shù)
* @param $connection
*/
public function onClose($connection)
{
}
/**
* 當(dāng)客戶端的連接上發(fā)生錯誤時觸發(fā)
* @param $connection
* @param $code
* @param $msg
*/
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
/**
* 每個進(jìn)程啟動
* @param $worker
*/
public function onWorkerStart($worker)
{
}
}上述都OK以后咱們可以項目路徑下通過命令啟動這個生產(chǎn)者:
php think worker:server

測試發(fā)送數(shù)據(jù):

通過這個網(wǎng)站
連接【ws://127.0.0.1:2345】后發(fā)送數(shù)據(jù)!

前往rabbitMQ控制臺

列隊中有一條消息產(chǎn)生并且等待了!
這個時候你可能問,如果我發(fā)送數(shù)據(jù)不想通過ws發(fā)送而是接口發(fā)送怎么辦?
笨思路唄:接口給內(nèi)置服務(wù)器發(fā)消息->內(nèi)置服務(wù)去發(fā)消息給rabbitMQ

將協(xié)議改為tcp
然后重新啟動服務(wù)

然后去tp6創(chuàng)建一個路由接口

接口代碼
<?php
namespace app\controller;
use app\BaseController;
class Index extends BaseController
{
public function index(string $msg)
{
//連接本地tcp服務(wù)
$client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
//發(fā)送字符串
fwrite($client, $msg."\n");
//斷開服務(wù)
fclose($client);
return 'OK';
}
}執(zhí)行結(jié)果:

說明接口成功的將數(shù)據(jù)發(fā)送給了本地內(nèi)置的tcp服務(wù)。

同時,內(nèi)置服務(wù)將收到的數(shù)據(jù)給了rabbitMQ服務(wù)列隊中。
生產(chǎn)者完成。
4、消費者
同生產(chǎn)者一樣新創(chuàng)建一個thinkphp6及安裝workerman擴(kuò)展,注意端口別和生產(chǎn)者沖突!這里我設(shè)置的是2346端口

創(chuàng)建的Receive類代碼如下:
<?php
namespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{
protected $socket = 'tcp://127.0.0.1:2346';
/**
* 收到信息
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
}
/**
* 當(dāng)連接建立時觸發(fā)的回調(diào)函數(shù)
* @param $connection
*/
public function onConnect($connection)
{
}
/**
* 當(dāng)連接斷開時觸發(fā)的回調(diào)函數(shù)
* @param $connection
*/
public function onClose($connection)
{
}
/**
* 當(dāng)客戶端的連接上發(fā)生錯誤時觸發(fā)
* @param $connection
* @param $code
* @param $msg
*/
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
/**
* 每個進(jìn)程啟動
* @param $worker
*/
public function onWorkerStart($worker)
{
//rabbitMQ配置
$options = [
'host'=>'127.0.0.1',//rabbitMQ IP
'port'=>5672,//rabbitMQ 通訊端口
'user'=>'admin',//rabbitMQ 賬號
'password'=>'123456'//rabbitMQ 密碼
];
(new Client($options))->connect()->then(function (Client $client) {
return $client->channel();
})->then(function (Channel $channel) {
/**
* 創(chuàng)建隊列(Queue)
* name: ceshi // 隊列名稱
* passive: false // 如果設(shè)置true存在則返回OK,否則就報錯。設(shè)置false存在返回OK,不存在則自動創(chuàng)建
* durable: true // 是否持久化,設(shè)置false是存放到內(nèi)存中RabbitMQ重啟后會丟失,
* 設(shè)置true則代表是一個持久的隊列,服務(wù)重啟之后也會存在,因為服務(wù)會把持久化的Queue存放在硬盤上,當(dāng)服務(wù)重啟的時候,會重新加載之前被持久化的Queue
* exclusive: false // 是否排他,指定該選項為true則隊列只對當(dāng)前連接有效,連接斷開后自動刪除
* auto_delete: false // 是否自動刪除,當(dāng)最后一個消費者斷開連接之后隊列是否自動被刪除
*/
return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
return $channel;
});
})->then(function (Channel $channel) {
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$channel->consume(
function (Message $message, Channel $channel, Client $client) {
echo "接收消息內(nèi)容:", $message->content, "\n";
},
'ceshi',
'',
false,
true
);
});
}
}都OK以后咱們可以項目路徑下通過命令啟動這個消費者:
php think worker:server
此時應(yīng)該會自動消費掉rabbitMQ中等待的消息!


到這里消費者也就結(jié)束啦!
5、整體測試
接下來我用cmd來啟動兩個服務(wù),然后用接口發(fā)送消息和消費測試!

至于具體怎么靈活應(yīng)用自行開拓大腦哦~
比如php項目有些業(yè)務(wù)吃力,可以去做個java的消費端,讓java來完成任務(wù)~

以上就是PHP實現(xiàn)RabbitMQ消息列隊的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于PHP RabbitMQ消息列隊的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
php+redis在實際項目中HTTP 500: Internal Server Error故障排除
用戶量快速增長,訪問量在短時間內(nèi)翻倍,由于前期容量規(guī)劃做得比較好,硬件資源可以支撐,可是軟件系統(tǒng)方面出現(xiàn)了大問題:40% 的請求都會返回 HTTP 500: Internal Server Error2017-02-02
PHP CURLFile函數(shù)模擬實現(xiàn)文件上傳示例詳解
這篇文章主要介紹了PHP使用CURLFile函數(shù)模擬實現(xiàn)文件上傳,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-09-09
windows下配置apache+php+mysql時出現(xiàn)問題的處理方法
windows下配置apache+php+mysql應(yīng)該是每個phper必須掌握的基礎(chǔ)技能了,這也是熟悉php的一個過程,小編當(dāng)年自己配環(huán)境的時候也遇到過這樣那樣的問題,現(xiàn)在把當(dāng)時記錄的幾個問題的處理方法分享給大家2014-06-06

