使用代理网络中的临时队列的请求/应答模式的ActiveMQ/Camel故障切换 - 无法发布到已删除的临时队列

问题描述:

由于缩放原因,我们最近从单个ActiveMQ代理切换到代理网络。尽管绝大多数情况下一切正常,但我们在白天全新部署代理后遇到了一个奇怪的问题:使用代理网络中的临时队列的请求/应答模式的ActiveMQ/Camel故障切换 - 无法发布到已删除的临时队列

首先,我们使用的是ActiveMQ 5.12.1和Camel 2.13。 4用于java方法和JMS端点之间的集成。代理端是使用以下配置

<broker useJmx="${activemq.expose.jmx}" persistent="false" 
    brokerName="${activemq.brokerName}" xmlns="http://activemq.apache.org/schema/core"> 
    <sslContext> 
     <amq:sslContext keyStore="${activemq.broker.keyStore}" 
      keyStorePassword="${activemq.broker.keyStorePassword}" 
      trustStore="${activemq.broker.trustStore}" 
      trustStorePassword="${activemq.broker.trustStorePassword}" /> 
    </sslContext> 
    <systemUsage> 
     <systemUsage> 
      <memoryUsage> 
       <memoryUsage limit="${activemq.memoryUsage}" /> 
      </memoryUsage> 
      <tempUsage> 
       <tempUsage limit="${activemq.tempUsage}" /> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 
    <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
       <policyEntry queue=">" enableAudit="false"> 
        <networkBridgeFilterFactory> 
         <conditionalNetworkBridgeFilterFactory 
          replayWhenNoConsumers="true" /> 
        </networkBridgeFilterFactory> 
       </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 
    <networkConnectors> 
     <networkConnector name="queues" 
      uri="static:(${activemq.otherBrokers})" 
      networkTTL="2" dynamicOnly="true" 
      decreaseNetworkConsumerPriority="true" 
      conduitSubscriptions="false"> 
      <excludedDestinations> 
       <topic physicalName=">" /> 
      </excludedDestinations> 
     </networkConnector> 
     <networkConnector name="topics" 
      uri="static:(${activemq.otherBrokers})" 
      networkTTL="1" dynamicOnly="true" 
      decreaseNetworkConsumerPriority="true" 
      conduitSubscriptions="true"> 
      <excludedDestinations> 
       <queue physicalName=">" /> 
      </excludedDestinations> 
     </networkConnector> 
    </networkConnectors> 
    <transportConnectors> 
     <transportConnector 
      uri="${activemq.protocol}${activemq.host}:${activemq.tcp.port}?needClientAuth=true" 
      updateClusterClients="true" rebalanceClusterClients="true" /> 
     <transportConnector 
      uri="${activemq.websocket.protocol}${activemq.websocket.host}:${activemq.websocket.port}?needClientAuth=true" 
      updateClusterClients="true" rebalanceClusterClients="true" /> 
    </transportConnectors> 
</broker> 

具有以下占位符目前由3个成员的代理网络值

activemq.tcp.port=9000 
activemq.protocol=ssl:// 
activemq.brokerName=activemq-server1.com 
activemq.expose.jmx=true 
activemq.otherBrokers=ssl://server2.com:9000,ssl://server3.com:9000 
activemq.websocket.port=9001 
activemq.websocket.protocol=stomp+ssl:// 
activemq.websocket.host=server1.com 
activemq.memoryUsage=1gb 
activemq.tempUsage=1gb 

在客户端被用于下面的骆驼配置

<bean id="xxx.activemq.redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 
    <property name="maximumRedeliveries" value="0" /> 
</bean> 

<bean id="xxx.activemq.jmsConnectionFactory" class="org.apache.activemq.ActiveMQSslConnectionFactory"> 
    <property name="trustStore" value="${activemq.broker.trustStore}" /> 
    <property name="trustStorePassword" value="${activemq.broker.trustStorePassword}" /> 
    <property name="keyStore" value="${activemq.broker.keyStore}" /> 
    <property name="keyStorePassword" value="${activemq.broker.keyStorePassword}" /> 
    <property name="brokerURL" value="${activemq.broker.url}" /> 
    <property name="redeliveryPolicy" ref="xxx.activemq.redeliveryPolicy" /> 
</bean> 

<bean id="xxx.activemq.jmsConfiguration" class="org.apache.activemq.camel.component.ActiveMQConfiguration"> 
    <property name="receiveTimeout" value="6000" /> 
    <property name="connectionFactory" ref="xxx.activemq.pooledConnectionFactory" /> 
</bean> 

<bean id="xxx.activemq.pooledConnectionFactory" 
    class="org.apache.activemq.pool.PooledConnectionFactory" 
    init-method="start" destroy-method="stop"> 
    <property name="maxConnections" value="8" /> 
    <property name="idleTimeout" value="0" /> 
    <property name="timeBetweenExpirationCheckMillis" 
     value="10000" /> 
    <property name="connectionFactory" 
     ref="xxx.activemq.jmsConnectionFactory" /> 
</bean> 

<bean id="xxx.activemq.jms.abstractComponent" abstract="true" 
    class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" 
     ref="xxx.activemq.jmsConfiguration" /> 
    <property name="connectionFactory" 
     ref="xxx.activemq.pooledConnectionFactory" /> 
    <property name="allowNullBody" value="true" /> 
    <property name="transferException" value="true" /> 
    <property name="defaultTaskExecutorType" 
     value="#{T(org.apache.camel.component.jms.DefaultTaskExecutorType).ThreadPool}" /> 
    <property name="requestTimeout" value="5000" /> 
</bean> 

<bean id="xxx.activemq.jms.queue" 
    parent="xxx.activemq.jms.abstractComponent"> 
    <property name="concurrentConsumers" value="2" /> 
    <property name="maxConcurrentConsumers" value="2" /> 
</bean> 

连接网址为

activemq.broker.url=failover:(ssl://server1.com:9000,ssl://server2.com:9000,ssl://server3.com:9000)?randomize=true 

请求/回复EIP是通过让生产者设置一个相应的jmsReplyTo头并使驼峰默认使用临时队列的InOut来实现的。

在部署之前,所有消息传递都按预期工作,但是之后对于某些请求/应答队列,我们​​将开始在生产者端获得超时。

在制片方

Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-xxx-intranet-phs-49404-1457684675710-8-11 not received on destination: temp-queue://ID:xxx.intranet.phs-41986-1457684806758-1:3:1. 
Exchange[Message: BeanInvocation public abstract xxx.xxx.rapi.dto.RemoteDTO xxx.xxx.xxx.facade.RemoteFacade.findRemoteDTO(java.lang.String,java.lang.Long) with [xxx, 31333]]] 

,并在消费者方面:

Caused by: javax.jms.InvalidDestinationException: Cannot publish to a deleted Destination: temp-queue://ID:xxx.intranet.phs-41986-1457684806758-1:3:1 

我们已经从那以后做了一些研究,发现问题会出现下面的条目在日志中出现了每当网络的任意代理关闭时,然后只对那些在关机命中时有临时队列打开以供回复并且故障转移到新代理的生产者。之后这个问题会持续存在,直到他重新启动。一旦他重新开始后重新开始,一切都恢复正常。该问题也在grokbase以及activemq-failover-with-temporary-queues-on-a-network-of-brokersactivemq-how-to-handle-broker-failovers-while-using-temporary-queues这两个主题上进行了描述。我们已经尝试了activemq-how-to-handle-broker-failovers-while-using-temporary-queues中给出的一个解决方案来设置缓存超时,但没有从中获得任何结果,另一个建议的为客户提供咨询监听的选项在我们的设置中并不是真正的选择,因为我们希望利用功能如clusterRebalancing,以便在运行时更容易将其他代理添加到网络中。

我们还发现了一些关于骆驼和ActiveMQ方面的JIRA问题,如CAMEL-3193描述了这个问题,并且明显地修复了我们更新的版本,所以我们很困惑。目前我们正在考虑通过临时队列切换到独占应答队列来解决这个问题,但首先想问问我们是否可能在某处丢失了一些配置。

如果您需要任何附加信息,请直接询问!

请参阅AMQ-5469:临时队列在建立它们的连接断开时被删除。当您关闭代理时会发生这种情况。

该解决方案可以是两个选项之一:

  • 重新创建重新建立代理连接
  • 不要使用临时队列的临时队列客户端,使用持久队列