经过一段时间不活动后在PHP CLI脚本中运行函数

问题描述:

我使用Symfony2和RabbitMqBundle创建一个将文档发送到ElasticSearch的工作人员。以逐一比率索引文档比使用ElasticSearch批量API慢得多。因此,我创建了一个缓冲区,以千组为单位将文档刷新到ES。代码看起来(有点简化)如下:经过一段时间不活动后在PHP CLI脚本中运行函数

class SearchIndexator 
{ 
    protected $elasticaService; 
    protected $buffer = []; 
    protected $bufferSize = 0; 

    // The maximum number of documents to keep in the buffer. 
    // If the buffer reaches this amount of documents, then the buffers content 
    // is send to elasticsearch for indexation. 
    const MAX_BUFFER_SIZE = 1000; 

    public function __construct(ElasticaService $elasticaService) 
    { 
     $this->elasticaService = $elasticaService; 
    } 

    /** 
    * Destructor 
    * 
    * Flush any documents that remain in the buffer. 
    */ 
    public function __destruct() 
    { 
     $this->flush(); 
    } 

    /** 
    * Add a document to the indexation buffer. 
    */ 
    public function onMessage(array $document) 
    { 
     // Prepare the document for indexation. 
     $this->doHeavyWeightStuff($document); 

     // Create an Elastica document 
     $document = new \Elastica\Document(
      $document['key'], 
      $document 
     ); 

     // Add the document to the buffer. 
     $this->buffer[] = $document; 

     // Flush the buffer when max buffersize has been reached. 
     if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { 
      $this->flush(); 
     } 
    } 

    /** 
    * Send the current buffer to ElasticSearch for indexation. 
    */ 
    public function flush() 
    { 
     // Send documents to ElasticSearch for indexation. 
     if (1 <= $this->bufferSize) { 
      $this->elasticaService->addDocuments($this->buffer); 
     } 

     // Clear buffer 
     $this->buffer = []; 
     $this->bufferSize = 0; 
    } 
} 

这一切都很好,但有一个小问题。队列以不可预知的速率填充消息。有时在5分钟内有10万次,有时不会持续数小时。例如,如果有82671个文档在排队,则最后的671个文档在收到另外329个可能需要几个小时的文档之前不会被索引。我希望能够做到以下几点:

警告:科幻代码!这显然是行不通的:

class SearchIndexator 
{ 
    protected $elasticaService; 
    protected $buffer = []; 
    protected $bufferSize = 0; 
    protected $flushTimer; 

    // The maximum number of documents to keep in the buffer. 
    // If the buffer reaches this amount of documents, then the buffers content 
    // is send to elasticsearch for indexation. 
    const MAX_BUFFER_SIZE = 1000; 

    public function __construct(ElasticaService $elasticaService) 
    { 
     $this->elasticaService = $elasticaService; 

     // Highly Sci-fi code 
     $this->flushTimer = new Timer(); 
     // Flush buffer after 5 minutes of inactivity. 
     $this->flushTimer->setTimeout(5 * 60); 
     $this->flushTimer->setCallback([$this, 'flush']); 
    } 

    /** 
    * Destructor 
    * 
    * Flush any documents that remain in the buffer. 
    */ 
    public function __destruct() 
    { 
     $this->flush(); 
    } 

    /** 
    * Add a document to the indexation buffer. 
    */ 
    public function onMessage(array $document) 
    { 
     // Prepare the document for indexation. 
     $this->doHeavyWeightStuff($document); 

     // Create an Elastica document 
     $document = new \Elastica\Document(
      $document['key'], 
      $document 
     ); 

     // Add the document to the buffer. 
     $this->buffer[] = $document; 

     // Flush the buffer when max buffersize has been reached. 
     if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { 
      $this->flush(); 
     } else { 
      // Start a timer that will flush the buffer after a timeout. 
      $this->initTimer(); 
     } 
    } 

    /** 
    * Send the current buffer to ElasticSearch for indexation. 
    */ 
    public function flush() 
    { 
     // Send documents to ElasticSearch for indexation. 
     if (1 <= $this->bufferSize) { 
      $this->elasticaService->addDocuments($this->buffer); 
     } 

     // Clear buffer 
     $this->buffer = []; 
     $this->bufferSize = 0; 

     // There are no longer messages to be send, stop the timer. 
     $this->flushTimer->stop(); 
    } 

    protected function initTimer() 
    { 
     // Start or restart timer 
     $this->flushTimer->isRunning() 
      ? $this->flushTimer->reset() 
      : $this->flushTimer->start(); 
    } 
} 

现在,我知道PHP不是事件驱动的局限性。但是,这是2015年,有像ReactPHP的解决方案,所以这应该是可能的权利?对于ØMQ,有this function。什么是解决方案,将为RabbitMQ工作或独立于任何消息队列扩展?

的解决方案,我很怀疑:

  1. crysalead/code。它使用declare(ticks = 1);模拟一个计时器。我不确定这是否是一种高性能和可靠的方法。有任何想法吗?
  2. 我可以运行一个cronjob,每5分钟发布一次'FLUSH'消息到同一个队列,然后在收到这条消息时明确地刷新缓冲区,但那会是作弊。
+0

不完全是你在找什么,但可能是一个很好的解决方案。 存储上次运行'flush'命令的时间,以及添加文档时还要检查时间。如果它已经超过5分钟冲洗。 第二好的选择是cronjob恕我直言 –

+0

问题是,当你长时间没有收到任何消息时,你不能检查时间,因此缓冲区不会被刷新。 cronjob在不同的进程中运行PHP,因此不能访问缓冲区。 – Xatoo

+0

因此,您在长时间运行的PHP进程中运行该代码?在这种情况下,你可能会使用信号(就像你的号码1选项一样)看看[这里](http://www.hackingwithphp.com/16/1/1/taking-control-of-php-pcntl_signal )和[这里](http://www.hackingwithphp。COM/16/1/2 /定时您-信号)。这些信号是非阻塞的,我自己还没有使用它,但它可能只是你的用例需要的东西。 –

正如我在我的评论中提到的,您可以使用这些信号。 PHP允许您向脚本信号注册信号处理程序(即SIGINT,SIGKILL等)

对于您的用例,您可以使用SIGALRM信号。这个信号会在一段时间(可以设置)过期后报警你的脚本。这些信号的积极方面是它们是非阻塞的。换句话说,脚本的正常操作不会受到干扰。

调整后的解决方案(蜱因为PHP 5.3不建议使用):

function signal_handler($signal) { 
    // You would flush here 
    print "Caught SIGALRM\n"; 
    // Set the SIGALRM timer again or it won't trigger again 
    pcntl_alarm(300); 
} 

// register your handler with the SIGALRM signal 
pcntl_signal(SIGALRM, "signal_handler", true); 
// set the timeout for the SIGALRM signal to 300 seconds 
pcntl_alarm(300); 

// start loop and check for pending signals 
while(pcntl_signal_dispatch() && your_loop_condition) { 
    //Execute your code here 
} 

注意:你只能在你的脚本中使用1个SIGALRM信号,如果你设置你的信号的时间pcntl_alarm计时器您警报将复位(不触发信号)到其新设定的值。

+0

是的,和我在我的问题中提到的“crysalead/code”项目一样。但是,它使用“声明滴答声”,我怀疑在每个语句之后中断PHP执行是否是一个高性能解决方案。你有什么经验吗? – Xatoo

+0

另外,您提供的链接指向解释使用滴答的页面的链接已弃用。大多数提到的蜱都表示使用蜱在大多数情况下是反模式。因此我感兴趣是否有其他选择。 – Xatoo

+0

您的权利,我忽略了表示滴答的部分已被弃用。我做了一些挖掘,发现了一个不推荐的选择。通过使用'pcntl_signal_dispatch()',您可以确定自己何时检查待处理的信号i.s.o.每次打勾运行处理程序。调整后的解决方案也会更高性能。希望这有助于。 –