RabbitMQ延时队列+supervisor守护进程消费
场景:用户A购买商品,点击下单,此时库存减少,商品只为等君支付,可惜,日复一日,辗转反侧,侧夜难眠,不见知音归来,该商品长厢厮守,孤独终生。桃叶儿尖上见,柳叶儿遮满了天,日思夜想用户A,何时把单结啊~~,(采用下单减库存的方法,超时未支付,库存还原)你无情无义,我也不必为你等待。
技术方案:RabbitMQ 消息队列实现订单延时处理,因为RabbitMQ也没有这样的实现方法,需要组合使用。死信机制,即设置消息过期时间,消息过期重新规划路由,写入另外一个队列,此时守护进程的消费者,开启消费任务。为了保证消费者一直持续高性能的运行,采用supervisor来监管,kill了就重启。
PHP amqplib客户端库安装:
composer require php-amqplib/php-amqplib
理论不多说了,可看其他文章,这里展示伪代码:
队列:要求有生产者、消费者
用户A下单操作,扮演角色:生产者 send.php
<?php
/**
* Created by PhpStorm.
* User: 奔跑吧笨笨
* Date: 2019/4/19
* Time: 11:51 AM
*/
require_once __DIR__.'/vendor/autoload.php';
$config = require_once __DIR__."/Config/config.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
//接收参数
$send_info = array();
$send_info['data'] = isset($_GET['data']) ? $_GET['data'] :'Hello World';
$send_info['type'] = isset($_GET['type']) ? $_GET['type'] : 1;
$send_info = json_encode($send_info,JSON_UNESCAPED_UNICODE);
$data = array();
$con_config = $config['rabbitmq'];
$connection = new AMQPStreamConnection($con_config['host'], $con_config['port'],$con_config['login'], $con_config['password'], $con_config['vhost']);
if(!$connection->isConnected()){
$data['code'] = 500;
$data['message'] = '建立连接失败';
echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
}
$channel = $connection->channel();
if(!$channel){
$data['code'] = 500;
$data['message'] = '通道连接失败';
echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
}
$message = date('Y-m-d H:i:s')." [x] 我将要发送\n";
$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
$channel->exchange_declare('cache_echange', 'direct',false,false,false);
$table = new AMQPTable();
$table->set('x-dead-letter-exchange', 'delay_exchange');
$table->set('x-dead-letter-routing-key','delay_exchange');
//订单超时时间 注意单位为毫秒,有的文档记录是微秒,非也,不信者可自测
$time = 60*1000;
$table->set('x-message-ttl',$time);
$channel->queue_declare('cache_queue_5',false,true,false,false,false, $table);
$channel->queue_bind('cache_queue_5', 'cache_echange','cache_queue_5');
$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
$msg = new AMQPMessage($send_info,array(
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
));
$channel->basic_publish($msg,'cache_echange','cache_queue_5');
$message .= date('Y-m-d H:i:s')." [x] Sent '$send_info'\n ";
$channel->close();
$connection->close();
$data['code'] = 200;
$data['messge'] = $message;
echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
定时触发机制:消费者 receice.php 守护进程监听
<?php
/**
* Created by PhpStorm.
* User: 奔跑吧笨笨
* Date: 2019/4/19
* Time: 11:52 AM
*/
require_once __DIR__.'/vendor/autoload.php';
$config = require_once __DIR__."/Config/config.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$con_config = $config['rabbitmq'];
$connection = new AMQPStreamConnection($con_config['host'], $con_config['port'],$con_config['login'], $con_config['password'], $con_config['vhost']);
if(!$connection->isConnected()){
$data['code'] = 500;
$data['message'] = '建立连接失败';
echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
}
$channel = $connection->channel();
if(!$channel){
$data['code'] = 500;
$data['message'] = '通道连接失败';
echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
}
$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;
$callback = function ($msg){
echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
#手动回应报文
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//执行业务操作
include_once __DIR__.'/temp_task.php';
temp_task($msg->body);
};
//只有consumer已经处理并确认了上一条message时queue才分派新的message给它
$channel->basic_qos(null, 1, null);
$channel->basic_consume('delay_queue','',false,false,false,false,$callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
这两个伪代码,就实现了一个延时触发机制,超过指定时间之后,执行业务操作,如果订单未支付,那么订单取消,库存回滚。
接下来:supervisor 的配置
环境:Centos
第一步:下载
yum install -y epel-release
yum install -y supervisor
第二步:配置
cd /etc/
mkdir supervisord.d
echo_supervisord_conf > supervisord.conf
vim /etc/supervisord.conf
#加入以下配置信息
[include]
files = /etc/supervisord.d/*.conf
第三步:开机启动服务文件 (写入文件位置 vim /etc/init.d/supervisord )
#!/bin/sh
#
# /etc/init.d/supervisord
#
# Supervisor is a client/server system that
# allows its users to monitor and control a
# number of processes on UNIX-like operating
# systems.
#
# chkconfig: - 64 36
# description: Supervisor Server
# processname: supervisord
# Source init functions
. /etc/rc.d/init.d/functions
prog="supervisord"
prog_bin="/usr/bin/supervisord"
PIDFILE="/var/run/$prog.pid"
start()
{
echo -n $"Starting $prog: "
###注意下面这一行一定得有-c /etc/supervisord.conf 不然修改了配置文件根本不生效!
daemon $prog_bin -c /etc/supervisord.conf --pidfile $PIDFILE
[ -f $PIDFILE ] && success $"$prog startup" || failure $"$prog startup"
echo
}
stop()
{
echo -n $"Shutting down $prog: "
[ -f $PIDFILE ] && killproc $prog || success $"$prog shutdown"
echo
}
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status $prog
;;
restart)
stop
start
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
;;
esac
三、2执行命令
chmod +x /etc/init.d/supervisord
chkconfig --add supervisord
chkconfig supervisord on
service supervisord start
第四步:开启web查看
# 如果想通过web查看管理的进程,加入以下代码,监听9001,用户user,密码123
[inet_http_server]
port=9001
username=username
password=123456
第五步:创建监听任务 (创建在 vim /etc/supervisord.d/rabbitmq_worker.conf 文件)
[program:rabbitmq_worker]
command=php /home/wwwroot/default/rabbitmq/worker/receive.php ;启动时命令行执行操作
autostart=true ;是否随supervisor启动
autorestart=true ;是否在挂了之后重启,意外关闭后会重启,比如kill掉!
startsecs=5 ;持续运行多久,认为运行成功
startretries=3 ;启动尝试次数
stderr_logfile=/tmp/rabbitmq_worker_err.log ;标准错误输出的位置
stdout_logfile=/tmp/rabbitmq_worker_out.log ;标准输出的位置
第六步:重新启动supervisor服务
service supervisord restart
第七步:查看服务启动状态
[[email protected] supervisord.d]# lsof -i:9001
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
superviso 9452 root 4u IPv4 40107676 0t0 TCP *:etlservicemgr (LISTEN)
第八步:web方式查看
http://my_ip:9001
输入前面设置的账号密码,查看(end)
到此为止,一个成熟的延时处理方案就出炉了。
我为人人,人人为我,美美与共,天下大同。