2019-4-9 浅谈重启worker和worker意外退出重新拉起
目的
swoole中server->reload重启所有worker。
当某个worker意外退出时,为了维持worker的数量会自动拉起一个新worker。
重点
基于信号,模拟如何实现。当然无法做到workman复杂的信号处理。看看就好,无需深究。
- 注册信号:pcntl_signal(信号,触发信号时的处理函数,FALSE);
- 信号分发:pcntl_signal_dispatch(); 非阻塞!。收到信号时,分发到上边的注册中
- pid=pcntl_wait($status); 阻塞!不过也可以设置为非阻塞 主进程用于接收子进程退出时返回码,如不主动接收,系统不会帮助主进程回收子进程。将此返回码赋值给status变量,如子进程pid为11855的进程,exit(15)或return(15)退出,那么pid=pcntcl_waite(status),pid=11855,status=15。
- posix系列函数,为进程控制函数。 如posix_getpid() 获得当前进程的pid,posix_kill() 向主进程或者子进程发送信号。
- 当然也为大家准备了批量杀死进程的命令,因为测试中难免会出现很多僵尸进程。
ps -ef|grep signal_server|grep -v grep|awk '{print "kill -9 " $2}' |sh
服务器
属性
<?php
class SignalServer
{
protected $_workerPids=[];//保存子进程pids,以实现对子进程的控制
protected $_workerNum=2; //启动的子进程数
protected $_sAddr; //socket地址
protected $_masterPid; //主进程pid
protected $currentWorkerId; //子进程调用,父进程给分配的编号
protected $onConnect; //客户端连接事件
protected $onReceive; //客户端收到信息事件
protected $onClose; //客户端连接关闭事件
protected $onWorkerStart; //work开始工作事件
protected $onWorkerStop; //work停止工作事件
public function __construct($socket_addr){
$this->_sAddr = $socket_addr;
$this->_masterPid = posix_getpid();//获得当前进程pid,因在主进程空间故为主进程pid
echo 'server start begin...'.PHP_EOL;
echo 'start master! pid:'.$this->_masterPid.PHP_EOL;
}
注册回调
/**
* 注册回调函数
* @param [string] $event [事件名称首字母无要求大小写]
* @param [closure] $closure [闭包]
* @return [none or exit] [成功没有返回,错误进程退出]
*/
public function on($event,$closure)
{
if(empty($event) && is_callable($closure)){
echo "parameters error!".PHP_EOL;
exit;
}
$event=lcfirst($event);
switch ($event) {
case 'workerStart':
$this->onWorkerStart = $closure;
break;
case 'workerStop':
$this->onWorkerStop = $closure;
break;
case 'connect':
$this->onConnect = $closure;
break;
case 'receive':
$this->onReceive = $closure;
break;
case 'close':
$this->onClose = $closure;
break;
default:
echo "invalid event!".PHP_EOL;
exit;
break;
}
}
启动初始化
/**
* 创建N个子进程
* 注册信号事件
* 监控主进程程序
* @return [type] [description]
*/
public function start(){
$this->forkWorker($this->_workerNum);
$this->installSignal();
$this->monitorWorkers();
}
目录
/**
* 创建N个子进程
* @param [int] $worker_num [子进程数量]
*/
protected function forkWorker($worker_num)
{
for ($i=0; $i < $worker_num; $i++)
{
$pid=pcntl_fork();
if($pid<0){
exit('fork fail!');
}else if($pid>0){
$this->_workerPids[]=$pid;
echo 'start worker! pid:'.$pid.PHP_EOL;
}else{
$this->currentWorkerId=$i;
$this->accept();
exit; //注意一定要exit,子进程会继续循环,成为嵌套子进程
}
}
echo 'server init end.'.PHP_EOL.PHP_EOL;
}
/**
* 注册信号处理器
*/
protected function installSignal( )
{
//用户自定义信号,-10,用于重启
pcntl_signal(SIGUSR1,[$this,'signalHandler'],FALSE);
//ctrl+c 退出主进程
pcntl_signal(SIGINT,[$this,'signalHandler'],FALSE);
}
/**
* 监控进程
*/
protected function monitorWorkers( )
{
while (true)
{
pcntl_signal_dispatch();
$pid = pcntl_wait($status); //阻塞,所以不用担心while无限循环
pcntl_signal_dispatch();
if($pid>1 && !pcntl_wifexited($status))
{
echo PHP_EOL.'正在检测pid是否意外退出 with status('.$status.') pid:'.$pid.PHP_EOL;
if(in_array($pid,$this->_workerPids)){
call_user_func($this->onWorkerStop);
$index=array_search($pid,$this->_workerPids);
unset($this->_workerPids[$index]);
echo '子进程意外退出,重新拉起..'.PHP_EOL.PHP_EOL;
$this->forkOneWorker($index);
}else{
echo '为主进程重启。'.PHP_EOL.PHP_EOL;
}
}
}
}
目录所需函数
/**
* 创建一个子进程
* @param [int] $index [被重启的worker下标,目的是work_id不变]
*/
protected function forkOneWorker($index)
{
$pid=pcntl_fork();
if($pid<0){
exit('fork fail!');
}else if($pid>0){
echo "启动新worker:".$pid.PHP_EOL;
$this->_workerPids[$index]=$pid;
}else{
$this->currentWorkerId=$index;
$this->accept();//处理连接核心程序
exit;
}
}
/**
* 信号处理程序
* @param [signal] $sig [由信号系统自动注入]
*/
protected function signalHandler($sig)
{
switch ($sig){
case SIGUSR1:
echo PHP_EOL.'收到重启信号SIGUSR1--将会重启所有worker...'.PHP_EOL;
$this->reload();
break;
case SIGINT:
$this->killAll();
break;
}
}
/**
* ctrl+c 全部退出
*/
protected function killAll()
{
$workerPids = $this->_workerPids;
foreach ($workerPids as $index=>$pid){
//var_dump('sigint beforunset',$this->_workerPids);
unset($this->_workerPids[$index]);
//var_dump('sigint afterunset',$this->_workerPids);
echo "结束进程",$pid.'...'.PHP_EOL;
posix_kill($pid,SIGKILL); //结束进程
}
posix_kill($this->_masterPid,SIGKILL);
}
/**
* 重启所有子进程
*/
protected function reload( )
{
$workerPids = $this->_workerPids;
foreach ($workerPids as $index=>$pid){
//var_dump('reload beforunset',$this->_workerPids);
unset($this->_workerPids[$index]);
//var_dump('reload afterunset',$this->_workerPids);
echo "结束进程",$pid.'...'.PHP_EOL;
posix_kill($pid,SIGKILL); //结束进程
$this->forkOneWorker($index);
}
}
套接字服务器
/**
* 套接字服务器
*/
protected function accept()
{
$connext_option=[
'socket'=>[
'backlog'=>10240, //等待处理连接的队列
],
];
$connext=stream_context_create($connext_option);
stream_context_set_option($connext,'socket','so_reuseaddr',1);
stream_context_set_option($connext,'socket','so_reuseport',1);
$main_sock=stream_socket_server($this->_sAddr,$errno,$errstr,STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$connext);
swoole_event_add($main_sock,function($sfd){
$clnt_sock=stream_socket_accept($sfd);
call_user_func($this->onConnect,$sfd,$clnt_sock);
swoole_event_add($clnt_sock,function($cfd) use ($sfd){
$buf=fread($cfd,1024);
if(empty($buf)){
if(!is_resource($cfd) || feof($cfd) ){
fclose($cfd);
call_user_func($this->onClose,$sfd,$cfd);
}
}else{
call_user_func($this->onReceive,$sfd,$cfd,$buf);
}
});
});
/*$a=include 'work.php';
echo $a.PHP_EOL;*/
call_user_func($this->onWorkerStart,$main_sock,$this->currentWorkerId);
}
客户端
$server=new SignalServer('tcp://0.0.0.0:6001');
$server->on('connect',function($server,$fd){
echo 'client_no:'.(int)$fd.' has connected!'.PHP_EOL;
});
$server->on('workerStart',function($server,$workerId){
echo 'onWorkerStart:worker_id :'.$workerId.' has start!';
$a=include 'work.php';
echo $a.PHP_EOL;
});
$server->on('workerStop',function(){
echo 'onWorkerStop'.PHP_EOL;
opcache_reset();
});
$server->on('receive',function($server,$fd,$data){
$content="谁啊?别请求了,服务器小不禁搞";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
$http_resonse .= "Connection: keep-alive\r\n"; //连接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($fd, $http_resonse);
//echo $data.PHP_EOL;
fwrite($fd, 'response');
});
$server->on('close',function($server,$fd){
echo 'client_no:'.(int)$fd.' has closed!'.PHP_EOL;
});
$server->start();
测试
启动
杀死一个子进程
重启所有子进程
对比
ctrl+c退出