的RabbitMQ - PHP AMQP破破的管道错误

问题描述:

我处理一个巨大的XML文档(包含围绕万条),随后使用RabbitMQ的导入格式化版本的分贝。每次发布大约200,000个条目后,我都会收到损坏的管道错误,而rabbitmq无法从中恢复。的RabbitMQ - PHP AMQP破破的管道错误

注意错误:fwrite():发送2651个字节失败,errno = 11资源暂时不可用[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,第439行]

注意错误:fwrite():33个字节的发送失败,errno = 104在[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,第439行]由对等方重置连接

通知错误:fwrite的():19个字节在发送失败,错误号= 32破碎的管[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,线439]

这随后会导致节点关闭错误,并且该进程需要手动终止才能从中恢复。

这是我的类方法: -

public function publishMessage($message) { 
    if (!isset($this->conn)) { 
     $this->_createNewConnectionAndChannel(); 
    } 
    try { 
     $this->ch->basic_publish(
      new AMQPMessage($message, array('content_type' => 'text/plain')), 
      $this->defaults['exchange']['name'], 
      $this->defaults['binding']['routing_key'] 
     ); 
    } catch (Exception $e) { 
     echo "Caught exception : " . $e->getMessage(); 
     echo "Creating new connection."; 
     $this->_createNewConnectionAndChannel(); 
     $this->publishMessage($message); // try again 
    } 
} 

protected function _createNewConnectionAndChannel() { 
    if (isset($this->conn)) { 
     $this->conn->close(); 
    } 

    if(isset($this->ch)) { 
     $this->ch->close(); 
    } 

    $this->conn = new AMQPConnection(
     $this->defaults['connection']['host'], 
     $this->defaults['connection']['port'], 
     $this->defaults['connection']['user'], 
     $this->defaults['connection']['pass'] 
    ); 
    $this->ch = $this->conn->channel(); 
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true); 
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching 

    $this->ch->queue_declare(
     $this->defaults['queue']['name'], 
     $this->defaults['queue']['passive'], 
     $this->defaults['queue']['durable'], 
     $this->defaults['queue']['exclusive'], 
     $this->defaults['queue']['auto_delete'] 
    ); 

    $this->ch->exchange_declare(
     $this->defaults['exchange']['name'], 
     $this->defaults['exchange']['type'], 
     $this->defaults['exchange']['passive'], 
     $this->defaults['exchange']['durable'], 
     $this->defaults['exchange']['auto_delete'] 
    ); 

    $this->ch->queue_bind(
     $this->defaults['queue']['name'], 
     $this->defaults['exchange']['name'], 
     $this->defaults['binding']['routing_key'] 
    ); 
} 

任何帮助将不胜感激。

+0

死TCP连接,您是否尝试过[PECL AMQP扩展(http://pecl.php.net/packages/amqp)?从以往的经验,这是少得多片状,[更好的维护(https://github.com/bkw/pecl-amqp-official)。 – salathe 2013-04-05 20:20:27

+0

是的..同样的问题。 – 2013-04-07 18:49:33

确保您已经添加的虚拟主机的访问对兔MQ用户。我创建了新用户并忘记了默认使用的“/”主机的设置访问权限。

你可以做,通过管理面板yourhost:15672>管理员>点击用户>查找“设置权限”。

P.S.我假设你的RabbitMQ服务正在运行,用户存在且密码正确。

+0

你说得对。那也是我的情况。谢谢。 – Winston 2016-07-29 07:00:57

其实,当你有你的消息中一个很大的内容和你的消费者花费太多时间只能处理一个消息,这个问题发生,这是问题的反应“确认”兔子和尝试使用另一条消息。

当我有这个问题,例如我尝试“适合”我的消息,因为它是一个产品的工人,每封邮件有一些像1K产品ID,所以我改变为100级的产品和它工作得很好。

你可以阅读更多有关检测与心跳here