php Swoole实现毫秒级定时任务
项目开发中,如果有定时任务的业务要求,我们会使用linux的crontab来解决,但是它的最小粒度是分钟级别,如果要求粒度是秒级别的,甚至毫秒级别的,crontab就无法满足,值得庆幸的是swoole提供的强大的毫秒定时器。
应用场景举例
我们可能会遇到这样的场景:
-
场景一:每隔30秒获取一次本机内存使用率
-
场景二:2分钟后执行报表发送任务
-
场景三:每天凌晨2点钟定时请求第三方接口,如果接口有数据返回则停止任务,如果接口由于某种原因没有响应或者没有数据返回则5分钟后继续尝试请求该接口,尝试5次后仍然失败则停止该任务
以上的三个场景我们都可以归纳为定时任务的范畴。
Swoole毫秒定时器
Swoole提供了异步毫秒定时器函数:
swoole_timer_tick(int $msec, callable $callback):设置一个间隔时钟定时器,每隔$msec毫秒执行一次$callback,类似于javascript中的setInterval()。
swoole_timer_after(int $after_time_ms, mixed $callback_function):在指定的时间$after_time_ms后执行$callback_function,类似于javascript的setTimeout()。
swoole_timer_clear(int $timer_id):删除指定id的定时器,类似于javascript的clearInterval()。
解决方案
对于场景一,经常用在系统检测统计方面,实时性要求比较高,但又能控制好频率,多用于后台服务器性能监控,可以生成可视化图表。可以是30秒获取一次内存使用率,也可以是10秒,而crontab最小粒度只能设置为1分钟。
1 swoole_timer_tick(30000, function($timer) use ($task_id) { // 启用定时器,每30秒执行一次
2 $memPercent = $this->getMemoryUsage(); //计算内存使用率
3 echo date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent."\n";
4 });
对于场景二,直接定义xx时间后执行某项任务的话,貌似crontab比较困难,而使用swoole的swoole_timer_after可以实现:
1 swoole_timer_after(120000, function() use ($str) { //2分钟后执行
2 $this->sendReport(); //发送报表
3 echo "send report, $str\n";
4 });
对于场景三,用来作尝试请求,请求失败后继续,如果成功则停止请求。用crontab也能解决,但是比较傻,比如设置每隔5分钟请求一次,不管成功会失败都会去执行一次。而用swoole定时器则智能多了。
1 swoole_timer_tick(5*60*1000, function($timer) use ($url) { // 启用定时器,每5分钟执行一次
2 $rs = $this->postUrl($url);
3
4 if ($rs) {
5 //业务代码...
6 swoole_timer_clear($timer); // 停止定时器
7 echo date('Y-m-d H:i:s'). "请求接口任务执行成功\n";
8 } else {
9 echo date('Y-m-d H:i:s'). "请求接口失败,5分钟后再次尝试\n";
10 }
11 });
示例代码
新建文件\src\App\Task.php:
1 <?php
2 namespace Helloweba\Swoole;
3
4 use swoole_server;
5
6 /**
7 * 任务调度
8 */
9 class Task
10 {
11 protected $serv;
12 protected $host = '127.0.0.1';
13 protected $port = 9506;
14 // 进程名称
15 protected $taskName = 'swooleTask';
16 // PID路径
17 protected $pidPath = '/run/swooletask.pid';
18 // 设置运行时参数
19 protected $options = [
20 'worker_num' => 4, //worker进程数,一般设置为CPU数的1-4倍
21 'daemonize' => true, //启用守护进程
22 'log_file' => '/data/log/swoole-task.log', //指定swoole错误日志文件
23 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
24 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
25 'task_worker_num' => 4, //task进程的数量
26 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
27 ];
28
29 public function __construct($options = [])
30 {
31 date_default_timezone_set('PRC');
32 // 构建Server对象,监听127.0.0.1:9506端口
33 $this->serv = new swoole_server($this->host, $this->port);
34
35 if (!empty($options)) {
36 $this->options = array_merge($this->options, $options);
37 }
38 $this->serv->set($this->options);
39
40 // 注册事件
41 $this->serv->on('Start', [$this, 'onStart']);
42 $this->serv->on('Connect', [$this, 'onConnect']);
43 $this->serv->on('Receive', [$this, 'onReceive']);
44 $this->serv->on('Task', [$this, 'onTask']);
45 $this->serv->on('Finish', [$this, 'onFinish']);
46 $this->serv->on('Close', [$this, 'onClose']);
47 }
48
49 public function start()
50 {
51 // Run worker
52 $this->serv->start();
53 }
54
55 public function onStart($serv)
56 {
57 // 设置进程名
58 cli_set_process_title($this->taskName);
59 //记录进程id,脚本实现自动重启
60 $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
61 file_put_contents($this->pidPath, $pid);
62 }
63
64 //监听连接进入事件
65 public function onConnect($serv, $fd, $from_id)
66 {
67 $serv->send( $fd, "Hello {$fd}!" );
68 }
69
70 // 监听数据接收事件
71 public function onReceive(swoole_server $serv, $fd, $from_id, $data)
72 {
73 echo "Get Message From Client {$fd}:{$data}\n";
74 //$this->writeLog('接收客户端参数:'.$fd .'-'.$data);
75 $res['result'] = 'success';
76 $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
77 $serv->task($data); // 执行异步任务
78 }
79
80 /**
81 * @param $serv swoole_server swoole_server对象
82 * @param $task_id int 任务id
83 * @param $from_id int 投递任务的worker_id
84 * @param $data string 投递的数据
85 */
86 public function onTask(swoole_server $serv, $task_id, $from_id, $data)
87 {
88 swoole_timer_tick(30000, function($timer) use ($task_id) { // 启用定时器,每30秒执行一次
89 $memPercent = $this->getMemoryUsage();
90 echo date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent."\n";
91 });
92 }
93
94
95 /**
96 * @param $serv swoole_server swoole_server对象
97 * @param $task_id int 任务id
98 * @param $data string 任务返回的数据
99 */
100 public function onFinish(swoole_server $serv, $task_id, $data)
101 {
102 //
103 }
104
105
106 // 监听连接关闭事件
107 public function onClose($serv, $fd, $from_id) {
108 echo "Client {$fd} close connection\n";
109 }
110
111 public function stop()
112 {
113 $this->serv->stop();
114 }
115
116 private function getMemoryUsage()
117 {
118 // MEMORY
119 if (false === ($str = @file("/proc/meminfo"))) return false;
120 $str = implode("", $str);
121 preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buf);
122 //preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buffers);
123
124 $memTotal = round($buf[1][0]/1024, 2);
125 $memFree = round($buf[2][0]/1024, 2);
126 $memUsed = $memTotal - $memFree;
127 $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0;
128
129 return $memPercent;
130 }
131 }
我们以场景一为例,在onTask启用定时任务,每隔30秒计算一次内存使用率。实际应用中可以把计算好的内存按时间写入数据库等存储中,然后可以根据前端需求用来渲染成统计图表,如:
接着服务端代码 public\taskServer.php :
<?php
require dirname(__DIR__) . '/vendor/autoload.php';use Helloweba\Swoole\Task;$opt = [
'daemonize' => false];$ser = new Task($opt);$ser->start();
客户端代码 public\taskClient.php :
<?php
class Client
{
private $client;
public function __construct() {
$this->client = new swoole_client(SWOOLE_SOCK_TCP);
}
public function connect() {
if( !$this->client->connect("127.0.0.1", 9506 , 1) ) {
echo "Error: {$this->client->errMsg}[{$this->client->errCode}]\n";
}
fwrite(STDOUT, "请输入消息 Please input msg:");
$msg = trim(fgets(STDIN));
$this->client->send( $msg );
$message = $this->client->recv();
echo "Get Message From Server:{$message}\n";
}
}
$client = new Client();
$client->connect();
验证效果
1.启动服务端:
php taskServer.php
2.客户端输入:
另开命令行窗口,执行
[[email protected] public]# php taskClient.php 请输入消息 Please input msg:hello
Get Message From Server:{"result":"success"}
[[email protected] public]#
3.服务端返回:
如果返回上图中的结果,则定时任务正常运行,我们会发现每隔30秒会输出一条信息。