PHP實(shí)現(xiàn)異步延遲消息隊(duì)列的方法詳解
一、前言
需求:電商秒殺場(chǎng)景中,如果用戶下單10分鐘未支付,需要進(jìn)行庫(kù)存歸還
本篇是用PHP+Laravel+RabbitMQ來(lái)實(shí)現(xiàn)異步延遲消息隊(duì)列
二、場(chǎng)景
在電商項(xiàng)目中,當(dāng)我們下單之后,一般需要 20 分鐘之內(nèi)或者 30 分鐘之內(nèi)付款,否則訂單就會(huì)進(jìn)入異常處理邏輯中,被取消,那么進(jìn)入到異常處理邏輯中,就可以當(dāng)成是一個(gè)延遲隊(duì)列
公司的會(huì)議預(yù)定系統(tǒng),在會(huì)議預(yù)定成功后,會(huì)在會(huì)議開(kāi)始前半小時(shí)通知所有預(yù)定該會(huì)議的用戶
安全工單超過(guò) 24 小時(shí)未處理,則自動(dòng)拉企業(yè)微信群提醒相關(guān)責(zé)任人
用戶下單外賣以后,距離超時(shí)時(shí)間還有 10 分鐘時(shí)提醒外賣小哥即將超時(shí)
…
很多場(chǎng)景下我們都需要延遲隊(duì)列。
本文以 RabbitMQ 為例來(lái)和大家聊一聊延遲隊(duì)列的玩法。
使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件來(lái)實(shí)現(xiàn)定時(shí)任務(wù),這種方案較簡(jiǎn)單。
三、安裝RabbitMQ延遲隊(duì)列插件
官網(wǎng)插件下載地址

我這里直接下載了最新版本,你們根據(jù)自己的rabbitmq版本號(hào)進(jìn)行下載

把下載好的文件移動(dòng)到rabbitmq的插件plugins下,以我自己的Mac為例子,放到了如下路徑

然后執(zhí)行安裝插件指令,如下
rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange

最后重啟rabbitmq服務(wù),并刷新查看exchanges交換機(jī)有沒(méi)有該插件

如上圖則延遲消息隊(duì)列插件安裝完成
四、在Laravel框架中進(jìn)行使用
新建rabbitmq服務(wù)類,包含延遲消息隊(duì)列生產(chǎn)消息,和消費(fèi)消息,如下

代碼如下:
<?php
namespace App\Http\Controllers\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class RabbitmqServer
{
private $host = "127.0.0.1";
private $port = 5672;
private $user = "guest";
private $password = "guest";
private $msg;
private $channel;
private $connection;
// 過(guò)期時(shí)間
const TIMEOUT_5_S = 5; // 5s
const TIMEOUT_10_S = 10; // 10s
private $exchange_logs = "logs";
private $exchange_direct = "direct";
private $exchange_delayed = "delayed";
private $queue_delayed = "delayedQueue";
const EXCHANGETYPE_FANOUT = "fanout";
const EXCHANGETYPE_DIRECT = "direct";
const EXCHANGETYPE_DELAYED = "x-delayed-message";
public function __construct($type = false)
{
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
$this->channel = $this->connection->channel();
// 聲明Exchange
$this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false, false, false, new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
$this->channel->queue_declare($this->queue_delayed, false, true, false, false);
$this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed, $this->queue_delayed);
}
/**
* delay creat message
*/
public function createMessageDelay($msg, $time)
{
$delayConfig = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
];
$msg = new AMQPMessage($msg, $delayConfig);
return $msg;
}
/**
* delay send message
*/
public function sendDelay($msg, $time = self::TIMEOUT_10_S)
{
$msg = $this->createMessageDelay($msg, $time);;
$this->channel->basic_publish($msg, $this->exchange_delayed, $this->queue_delayed);
$this->channel->close();
$this->connection->close();
}
/**
* delay consum
*/
public function consumDelay()
{
$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
$this->channel->basic_ack($msg->delivery_info['delivery_tag'], false);
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
$this->channel->close();
$this->connection->close();
}
}比如新建QueueController控制器,進(jìn)行測(cè)試生產(chǎn)消息放到延遲消息隊(duì)列中

代碼如下:
<?php
namespace App\Http\Controllers\Api\v1;
use App\Http\Controllers\Controller;
use App\Http\Controllers\Service\RabbitmqServer;
use App\Jobs\Queue;
use Illuminate\Http\Request;
class QueueController extends Controller
{
//
public function index(Request $request)
{
//比如說(shuō)現(xiàn)在是下訂單操作
//需求:如果用戶10分鐘之內(nèi)不支付訂單就要取消訂單,并且?guī)齑鏆w還
$msg = $request->post();
$Rabbit = new RabbitmqServer("x-delayed-message");
//第一個(gè)參數(shù)發(fā)送的消息,第二個(gè)參數(shù)延遲多少秒
$Rabbit->sendDelay(json_encode($msg),5);
}
}至此通過(guò)接口調(diào)試工具進(jìn)行模擬生產(chǎn)消息即可
消息生產(chǎn)完畢要進(jìn)行消費(fèi),這里使用的是Laravel的任務(wù)調(diào)度,代碼如下

<?php
namespace App\Console\Commands;
use App\Http\Controllers\Service\RabbitmqServer;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq_consumer';//給消費(fèi)者起個(gè)command名稱
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
* @return int
*/
public function handle()
{
$Rabbit = new RabbitmqServer("x-delayed-message");
$Rabbit->consumDelay();
}
}五、執(zhí)行生產(chǎn)消息和消費(fèi)消息
用postman模擬生產(chǎn)消息,效果如下:

然后消費(fèi)消息,用一下命令,如果延遲5秒執(zhí)行消費(fèi)則成功

至此,就完成了rabbitmq異步延遲消息隊(duì)列
到此這篇關(guān)于PHP實(shí)現(xiàn)異步延遲消息隊(duì)列的方法詳解的文章就介紹到這了,更多相關(guān)PHP異步延遲消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
PHP隨機(jī)生成隨機(jī)個(gè)數(shù)的字母組合示例
在很多系統(tǒng)環(huán)境下大家都會(huì)用到字母組合各種編碼。下面為大家介紹下使用php隨機(jī)生成隨機(jī)個(gè)數(shù)的字母組合,感興趣的朋友可以了解下2014-01-01
PHP中header()函數(shù)的七種用法小結(jié)
我們?cè)趯?shí)際開(kāi)發(fā)中經(jīng)常使用header()實(shí)現(xiàn)一些功能,這篇文章介紹關(guān)于header()的7中用法,文中有詳細(xì)的代碼示例,具有一定的參考價(jià)值,需要的朋友可以參考下2023-08-08
在VSCode中配置PHP開(kāi)發(fā)環(huán)境的實(shí)戰(zhàn)步驟
最近要寫一些可視化的網(wǎng)站,所以先把需要的環(huán)境配好吧,下面這篇文章主要給大家介紹了關(guān)于在VSCode中配置PHP開(kāi)發(fā)環(huán)境的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-11-11
使用PHP實(shí)現(xiàn)蜘蛛訪問(wèn)日志統(tǒng)計(jì)
本篇文章是對(duì)使用PHP實(shí)現(xiàn)蜘蛛訪問(wèn)日志統(tǒng)計(jì)的代碼進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-07-07
PHP應(yīng)用代碼復(fù)雜度檢測(cè)使用方法
這篇文章主要為大家介紹了PHP應(yīng)用代碼復(fù)雜度檢測(cè)使用方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06
php數(shù)組函數(shù)序列之sort() 對(duì)數(shù)組的元素值進(jìn)行升序排序
sort() 函數(shù)按升序?qū)o定數(shù)組的值排序。注釋:本函數(shù)為數(shù)組中的單元賦予新的鍵名。原有的鍵名將被刪除2011-11-11

