经过一段时间不活动后在PHP CLI脚本中运行函数
我使用Symfony2和RabbitMqBundle创建一个将文档发送到ElasticSearch的工作人员。以逐一比率索引文档比使用ElasticSearch批量API慢得多。因此,我创建了一个缓冲区,以千组为单位将文档刷新到ES。代码看起来(有点简化)如下:经过一段时间不活动后在PHP CLI脚本中运行函数
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
}
}
这一切都很好,但有一个小问题。队列以不可预知的速率填充消息。有时在5分钟内有10万次,有时不会持续数小时。例如,如果有82671个文档在排队,则最后的671个文档在收到另外329个可能需要几个小时的文档之前不会被索引。我希望能够做到以下几点:
警告:科幻代码!这显然是行不通的:
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
protected $flushTimer;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
// Highly Sci-fi code
$this->flushTimer = new Timer();
// Flush buffer after 5 minutes of inactivity.
$this->flushTimer->setTimeout(5 * 60);
$this->flushTimer->setCallback([$this, 'flush']);
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
} else {
// Start a timer that will flush the buffer after a timeout.
$this->initTimer();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
// There are no longer messages to be send, stop the timer.
$this->flushTimer->stop();
}
protected function initTimer()
{
// Start or restart timer
$this->flushTimer->isRunning()
? $this->flushTimer->reset()
: $this->flushTimer->start();
}
}
现在,我知道PHP不是事件驱动的局限性。但是,这是2015年,有像ReactPHP的解决方案,所以这应该是可能的权利?对于ØMQ,有this function。什么是解决方案,将为RabbitMQ工作或独立于任何消息队列扩展?
的解决方案,我很怀疑:
- 有crysalead/code。它使用
declare(ticks = 1);
模拟一个计时器。我不确定这是否是一种高性能和可靠的方法。有任何想法吗? - 我可以运行一个cronjob,每5分钟发布一次'FLUSH'消息到同一个队列,然后在收到这条消息时明确地刷新缓冲区,但那会是作弊。
正如我在我的评论中提到的,您可以使用这些信号。 PHP允许您向脚本信号注册信号处理程序(即SIGINT,SIGKILL等)
对于您的用例,您可以使用SIGALRM信号。这个信号会在一段时间(可以设置)过期后报警你的脚本。这些信号的积极方面是它们是非阻塞的。换句话说,脚本的正常操作不会受到干扰。
调整后的解决方案(蜱因为PHP 5.3不建议使用):
function signal_handler($signal) {
// You would flush here
print "Caught SIGALRM\n";
// Set the SIGALRM timer again or it won't trigger again
pcntl_alarm(300);
}
// register your handler with the SIGALRM signal
pcntl_signal(SIGALRM, "signal_handler", true);
// set the timeout for the SIGALRM signal to 300 seconds
pcntl_alarm(300);
// start loop and check for pending signals
while(pcntl_signal_dispatch() && your_loop_condition) {
//Execute your code here
}
注意:你只能在你的脚本中使用1个SIGALRM信号,如果你设置你的信号的时间pcntl_alarm
计时器您警报将复位(不触发信号)到其新设定的值。
是的,和我在我的问题中提到的“crysalead/code”项目一样。但是,它使用“声明滴答声”,我怀疑在每个语句之后中断PHP执行是否是一个高性能解决方案。你有什么经验吗? – Xatoo
另外,您提供的链接指向解释使用滴答的页面的链接已弃用。大多数提到的蜱都表示使用蜱在大多数情况下是反模式。因此我感兴趣是否有其他选择。 – Xatoo
您的权利,我忽略了表示滴答的部分已被弃用。我做了一些挖掘,发现了一个不推荐的选择。通过使用'pcntl_signal_dispatch()',您可以确定自己何时检查待处理的信号i.s.o.每次打勾运行处理程序。调整后的解决方案也会更高性能。希望这有助于。 –
不完全是你在找什么,但可能是一个很好的解决方案。 存储上次运行'flush'命令的时间,以及添加文档时还要检查时间。如果它已经超过5分钟冲洗。 第二好的选择是cronjob恕我直言 –
问题是,当你长时间没有收到任何消息时,你不能检查时间,因此缓冲区不会被刷新。 cronjob在不同的进程中运行PHP,因此不能访问缓冲区。 – Xatoo
因此,您在长时间运行的PHP进程中运行该代码?在这种情况下,你可能会使用信号(就像你的号码1选项一样)看看[这里](http://www.hackingwithphp.com/16/1/1/taking-control-of-php-pcntl_signal )和[这里](http://www.hackingwithphp。COM/16/1/2 /定时您-信号)。这些信号是非阻塞的,我自己还没有使用它,但它可能只是你的用例需要的东西。 –