RabbitMQ延时队列+supervisor守护进程消费

 

场景:用户A购买商品,点击下单,此时库存减少,商品只为等君支付,可惜,日复一日,辗转反侧,侧夜难眠,不见知音归来,该商品长厢厮守,孤独终生。桃叶儿尖上见,柳叶儿遮满了天,日思夜想用户A,何时把单结啊~~,(采用下单减库存的方法,超时未支付,库存还原)你无情无义,我也不必为你等待。

RabbitMQ延时队列+supervisor守护进程消费

技术方案:RabbitMQ 消息队列实现订单延时处理,因为RabbitMQ也没有这样的实现方法,需要组合使用。死信机制,即设置消息过期时间,消息过期重新规划路由,写入另外一个队列,此时守护进程的消费者,开启消费任务。为了保证消费者一直持续高性能的运行,采用supervisor来监管,kill了就重启。

PHP amqplib客户端库安装:

composer require php-amqplib/php-amqplib

 

理论不多说了,可看其他文章,这里展示伪代码:

队列:要求有生产者、消费者

 

用户A下单操作,扮演角色:生产者 send.php

<?php
/**
 * Created by PhpStorm.
 * User: 奔跑吧笨笨
 * Date: 2019/4/19
 * Time: 11:51 AM
 */
require_once __DIR__.'/vendor/autoload.php';
$config = require_once __DIR__."/Config/config.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;


//接收参数
$send_info = array();
$send_info['data'] = isset($_GET['data']) ? $_GET['data'] :'Hello World';
$send_info['type'] = isset($_GET['type']) ? $_GET['type'] : 1;

$send_info = json_encode($send_info,JSON_UNESCAPED_UNICODE);


$data = array();
$con_config = $config['rabbitmq'];

$connection = new AMQPStreamConnection($con_config['host'], $con_config['port'],$con_config['login'], $con_config['password'], $con_config['vhost']);

if(!$connection->isConnected()){
    $data['code'] = 500;
    $data['message'] = '建立连接失败';
    echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
}

$channel = $connection->channel();

if(!$channel){
    $data['code'] = 500;
    $data['message'] = '通道连接失败';
    echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
}

$message = date('Y-m-d H:i:s')." [x] 我将要发送\n";

$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
$channel->exchange_declare('cache_echange', 'direct',false,false,false);

$table = new AMQPTable();
$table->set('x-dead-letter-exchange', 'delay_exchange');
$table->set('x-dead-letter-routing-key','delay_exchange');
//订单超时时间  注意单位为毫秒,有的文档记录是微秒,非也,不信者可自测
$time = 60*1000;
$table->set('x-message-ttl',$time);

$channel->queue_declare('cache_queue_5',false,true,false,false,false, $table);
$channel->queue_bind('cache_queue_5', 'cache_echange','cache_queue_5');

$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');



$msg = new AMQPMessage($send_info,array(
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
));

$channel->basic_publish($msg,'cache_echange','cache_queue_5');
$message .= date('Y-m-d H:i:s')." [x] Sent '$send_info'\n ";

$channel->close();
$connection->close();

$data['code'] = 200;
$data['messge'] = $message;
echo json_encode($data,JSON_UNESCAPED_UNICODE);die;

 

定时触发机制:消费者   receice.php  守护进程监听

<?php
/**
 * Created by PhpStorm.
 * User: 奔跑吧笨笨
 * Date: 2019/4/19
 * Time: 11:52 AM
 */
require_once __DIR__.'/vendor/autoload.php';
$config = require_once __DIR__."/Config/config.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;


$con_config = $config['rabbitmq'];

$connection = new AMQPStreamConnection($con_config['host'], $con_config['port'],$con_config['login'], $con_config['password'], $con_config['vhost']);

if(!$connection->isConnected()){
    $data['code'] = 500;
    $data['message'] = '建立连接失败';
    echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
}

$channel = $connection->channel();

if(!$channel){
    $data['code'] = 500;
    $data['message'] = '通道连接失败';
    echo json_encode($data,JSON_UNESCAPED_UNICODE);die;
}


$channel->exchange_declare('delay_exchange', 'direct',false,false,false);


$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');

echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;

$callback = function ($msg){

    echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
    #手动回应报文
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    //执行业务操作
    include_once __DIR__.'/temp_task.php';
    temp_task($msg->body);
};

//只有consumer已经处理并确认了上一条message时queue才分派新的message给它
$channel->basic_qos(null, 1, null);
$channel->basic_consume('delay_queue','',false,false,false,false,$callback);

while (count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

 

这两个伪代码,就实现了一个延时触发机制,超过指定时间之后,执行业务操作,如果订单未支付,那么订单取消,库存回滚。

接下来:supervisor 的配置  

环境:Centos

第一步:下载

yum install -y epel-release
yum install -y supervisor

 

第二步:配置

cd /etc/
mkdir supervisord.d
echo_supervisord_conf > supervisord.conf
vim /etc/supervisord.conf
#加入以下配置信息
[include]
files = /etc/supervisord.d/*.conf

第三步:开机启动服务文件 (写入文件位置 vim   /etc/init.d/supervisord )

#!/bin/sh
#
# /etc/init.d/supervisord
#
# Supervisor is a client/server system that
# allows its users to monitor and control a
# number of processes on UNIX-like operating
# systems.
#
# chkconfig: - 64 36
# description: Supervisor Server
# processname: supervisord
 
# Source init functions
. /etc/rc.d/init.d/functions
 
prog="supervisord"

prog_bin="/usr/bin/supervisord"
PIDFILE="/var/run/$prog.pid"
 
start()
{
       echo -n $"Starting $prog: "
       ###注意下面这一行一定得有-c /etc/supervisord.conf   不然修改了配置文件根本不生效!
       daemon $prog_bin -c /etc/supervisord.conf --pidfile $PIDFILE
       [ -f $PIDFILE ] && success $"$prog startup" || failure $"$prog startup"
       echo
}
 
stop()
{
       echo -n $"Shutting down $prog: "
       [ -f $PIDFILE ] && killproc $prog || success $"$prog shutdown"
       echo
}
 
case "$1" in
 
 start)
   start
 ;;
 
 stop)
   stop
 ;;
 
 status)
       status $prog
 ;;
 
 restart)
   stop
   start
 ;;
 
 *)
   echo "Usage: $0 {start|stop|restart|status}"
 ;;
 
esac

三、2执行命令 

chmod +x /etc/init.d/supervisord
chkconfig --add supervisord
chkconfig supervisord on
service supervisord start

 第四步:开启web查看

# 如果想通过web查看管理的进程,加入以下代码,监听9001,用户user,密码123
[inet_http_server] 
port=9001
username=username     
password=123456

第五步:创建监听任务   (创建在 vim  /etc/supervisord.d/rabbitmq_worker.conf 文件)

[program:rabbitmq_worker]
command=php /home/wwwroot/default/rabbitmq/worker/receive.php     ;启动时命令行执行操作
autostart=true                           ;是否随supervisor启动
autorestart=true                         ;是否在挂了之后重启,意外关闭后会重启,比如kill掉!
startsecs=5                              ;持续运行多久,认为运行成功
startretries=3                           ;启动尝试次数
stderr_logfile=/tmp/rabbitmq_worker_err.log        ;标准错误输出的位置
stdout_logfile=/tmp/rabbitmq_worker_out.log        ;标准输出的位置

第六步:重新启动supervisor服务

service supervisord restart

第七步:查看服务启动状态

[[email protected] supervisord.d]# lsof -i:9001
COMMAND    PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
superviso 9452 root    4u  IPv4 40107676      0t0  TCP *:etlservicemgr (LISTEN)

第八步:web方式查看

http://my_ip:9001

输入前面设置的账号密码,查看(end)

 

到此为止,一个成熟的延时处理方案就出炉了。

 

我为人人,人人为我,美美与共,天下大同。