如何通过故障转移传输处理Activemq的最大帧大小异常

问题描述:

我正在开发一个使用activemq来交换消息的应用程序,这个消息有点大,我想要取消。如何通过故障转移传输处理Activemq的最大帧大小异常

我们使用带有两个ActiveMQ实例(主/从)的activemq故障转移传输。代理本身具有100mb的消息框架限制。

问题是:如果我尝试发送大于100mb的消息,ActiveMQ服务器将关闭连接。此时,故障转移传输将尝试重新连接并再次发送消息,从而创建无限循环。

客户端登录如下:

2017-01-05 09:19:11.910 WARN 14680 --- [0.1:[email protected]] o.a.a.t.failover.FailoverTransport  : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {} 

java.io.EOFException: null 
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_91] 
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) ~[activemq-client-5.13.4.jar:5.13.4] 
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.13.4.jar:5.13.4] 
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.13.4.jar:5.13.4] 
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.13.4.jar:5.13.4] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] 

2017-01-05 09:19:11.921 INFO 14680 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport  : Successfully reconnected to tcp://localhost:61616 
2017-01-05 09:19:11.923 WARN 14680 --- [0.1:[email protected]] o.a.a.t.failover.FailoverTransport  : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {} 

java.io.EOFException: null 
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_91] 
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) ~[activemq-client-5.13.4.jar:5.13.4] 
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.13.4.jar:5.13.4] 
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.13.4.jar:5.13.4] 
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.13.4.jar:5.13.4] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] 

虽然ActiveMQ的实例日志:

2017-01-05 09:19:11,909 | WARN | Transport Connection to: tcp://127.0.0.1:57025 failed: java.io.IOException: Frame size of 363 MB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///127.0.0.1:[email protected] 
2017-01-05 09:19:11,922 | WARN | Transport Connection to: tcp://127.0.0.1:57026 failed: java.io.IOException: Frame size of 363 MB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///127.0.0.1:[email protected] 

我试图建立一个TransportListener来验证,如果我能捕捉到这种情况,但我只收到transportInterupted事件,没有任何分类器。

我阅读了有关故障转移传输的文档(http://activemq.apache.org/failover-transport-reference.html),也许我可以使用maxReconnectAttempts,但我知道在更常见的情况下(如服务器暂时不可用)会有一些缺陷。

如何检测这种情况并避免客户端和服务器之间的无限连接循环?

至于你说的这种方式

maxReconnectAttempts -1 | 0从ActiveMQ 5.6:默认是-1,永久重试。 0意味着禁用重新连接,例如:只是尝试连接一次。在ActiveMQ之前 5.6:默认为0,请重试。所有ActiveMQ版本:值> 0表示错误发送回客户端之前的重新连接尝试的最大次数为 。

所以,如果你想失败重试后要通知你的传输监听传输失败的,由于你的信息,你需要在那之后maxReconnectAttempts设置的值> 0时的最大重试次数达到的方法onException的大小您的传输侦听器将被调用IOException作为参数,但正如您所说的,验证它是否由于最大值或其他问题并不容易。

如果你想通过JMX访问它来检查发送你可以得到maxFrameSize在运行时的代理方的URI配置之前提议的消息大小并获得BrokerViewMBean实例,并调用getTransportConnectorByType方法http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/broker/jmx/BrokerViewMBean.html#line.304这将返回uri在activemq.xml中配置,您可以解析它以检索maxFrameSize。

JMXServiceURL url = new  JMXServiceURL("service:jmx:rmi:///jndi/rmi://hist:1099/jmxrmi"); 
JMXConnector jmxc = JMXConnectorFactory.connect(url); 
MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 

ObjectName activeMq = new ObjectName("org.apache.activemq:Type=Broker,BrokerName=localhost"); 

BrokerViewMBean mbean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, true); 
String uri = mbean.getTransportConnectorByType("tcp");// or ("ssl") 
String[] pairs = uri.split("&"); 
for (String pair : pairs) { 
    if (pair.startsWith("wireFormat.maxFrameSize")) { 
     int idx = pair.indexOf("="); 
     System.out.println(URLDecoder.decode(pair.substring(idx + 1), "UTF-8")); 
    } 
} 

http://activemq.apache.org/maven/apidocs/org/apache/activemq/broker/jmx/BrokerViewMBean.html#getTransportConnectors--会返回地图运输名称作为关键字和URI的作为值

有消息的更好的大小,你可以这样做:

 OpenWireFormat opf = new OpenWireFormat(); 
     opf.setTightEncodingEnabled(true); 
     ByteSequence tab = opf.marshal(message); 
     System.out.println(tab.length); 

您的企业必须是像这样:

import java.io.IOException; 
import java.net.URLDecoder; 
import java.util.HashMap; 
import java.util.Map; 

import javax.jms.Connection; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.management.MBeanServerConnection; 
import javax.management.MBeanServerInvocationHandler; 
import javax.management.ObjectName; 
import javax.management.remote.JMXConnector; 
import javax.management.remote.JMXConnectorFactory; 
import javax.management.remote.JMXServiceURL; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 
import org.apache.activemq.broker.jmx.BrokerViewMBean; 
import org.apache.activemq.command.ActiveMQTextMessage; 
import org.apache.activemq.openwire.OpenWireFormat; 
import org.apache.activemq.transport.TransportFilter; 
import org.apache.activemq.transport.TransportListener; 
import org.apache.activemq.transport.failover.FailoverTransport; 
import org.apache.activemq.util.ByteSequence; 

public class SimpleSenderMaxSizeManager { 

    private static Connection conn = null; 
    private static boolean transportChanged; 
    private static Long MAX_FRAME_SIZE; 

    public static void main(String[] args) throws JMSException { 
     try { 
      SimpleSenderMaxSizeManager.updateMaxSize("host1"); 
      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
        "failover:(tcp://host1:5670,tcp://host2:5671)?randomize=false"); 
      cf.setTransportListener(new TransportListener() { 

       @Override 
       public void transportResumed() { 
        if (transportChanged) { 
         transportChanged = false; 
         try { 
          SimpleSenderMaxSizeManager.updateMaxSize(null); 
         } catch (Exception e) { 
         } 
        } 
       } 

       @Override 
       public void transportInterupted() { 
        transportChanged = true; 
       } 

       @Override 
       public void onException(IOException error) { 
       } 

       @Override 
       public void onCommand(Object command) { 
       } 
      }); 
      conn = cf.createConnection(); 
      ActiveMQSession session = (ActiveMQSession) conn.createSession(false, 
        ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 
      MessageProducer producer = session.createProducer(session.createQueue("TEST")); 
      conn.start(); 
      ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("test"); 
      OpenWireFormat opf = new OpenWireFormat(); 
      opf.setTightEncodingEnabled(true); 
      ByteSequence tab = opf.marshal(msg); 
      System.out.println(tab.length); 
      if (tab.length >= MAX_FRAME_SIZE) { 
       throw new RuntimeException(tab.length + ">=" + MAX_FRAME_SIZE); 
      } 
      producer.send(msg); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      if (conn != null) { 
       try { 
        conn.close(); 
       } catch (Exception e) { 
       } 
      } 
     } 
    } 

    protected static void updateMaxSize(String host) throws Exception { 
     JMXConnector jmxc = null; 
     try { 
      String jmxHost = host; 
      String scheme = null; 
      if (conn == null) { 
       scheme = "tcp"; 
      } else { 
       org.apache.activemq.transport.TransportFilter responseCorrelator = (TransportFilter) ((ActiveMQConnection) conn) 
         .getTransport(); 
       TransportFilter mutexTransport = (TransportFilter) responseCorrelator.getNext(); 
       FailoverTransport failoverTransport = (FailoverTransport) mutexTransport.getNext(); 
       while (failoverTransport.getConnectedTransportURI() == null) { 
        try { 
         Thread.sleep(100); 
        } catch (Exception e) { 
        } 
       } 
       scheme = failoverTransport.getConnectedTransportURI().getScheme(); 
       jmxHost = failoverTransport.getConnectedTransportURI().getHost(); 
      } 
      JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + jmxHost + ":1099/jmxrmi"); 
      Map<String, String[]> env = new HashMap<>(); 
      String[] creds = { "admin", "admin" }; 
      env.put(JMXConnector.CREDENTIALS, creds); 
      jmxc = JMXConnectorFactory.connect(url, env); 
      MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 
      ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); 
      BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, 
        true); 
      String value = mbean.getTransportConnectorByType(scheme); 
      String[] pairs = value.split("&"); 
      for (String pair : pairs) { 
       if (pair.contains("wireFormat.maxFrameSize")) { 
        int idx = pair.indexOf("="); 
        System.out.println(URLDecoder.decode(pair.substring(idx + 1), "UTF-8")); 
        MAX_FRAME_SIZE = Long.valueOf(URLDecoder.decode(pair.substring(idx + 1), "UTF-8")); 
        MAX_FRAME_SIZE -= 1000;// security for JMS headers added by 
              // session on sending 
       } 
      } 
     } finally { 
      if (jmxc != null) { 
       try { 
        jmxc.close(); 
       } catch (Exception e) { 
       } 
      } 
     } 
    } 
} 
+0

相当不错的答案。我认为解决方案将通过JMX检查正确的框架。问题:opf.marshal(message);将序列化整个消息只是为了获得它的大小?如果我有一个巨大的消息大小,这可能是一个缺点(尽管我使用getBytes(“UTF-8”)仅用于测试。 –

+0

是啊opf.marshal(消息);是AMQ客户端发送之前所使用的将整个消息与头部编组在一起,以获得接近真实的大小。在所有情况下,AMQ会稍后使用此方法,并且我认为这对您是否有用以确定它是否正常工作 –

我不相信这是可能的。您正在尝试对错误处理进行分类,以查找不会因故障转移而发生故障的异常:传输。如果超出最大客户端数量,则可能会发生相同类型的异常。

在发送声音之前检查消息大小是否可行。

是否有原因尺寸检查不符合您的要求?

public String mySendMessage(String body) { 
.... 
if(body.length > MAX_ALLOWED) .. 
    throw new Exception.. or log.. or other 
else 
    producer.send(session.createTextMessage(body)); 
+0

这个问题对我来说是w e在客户端驱动程序和activemq之间创建一个我无法处理或干扰的循环。 –

+0

关于您的问题:checkint问题在发送之前,消息的大小是我不知道activemq实例允许的最大大小是多少。在其他作品中,我可以更改activemq的配置,但我也需要更改应用程序的配置。它更容易出错。 –

+0

如何获取邮件有效负载的大小? (请参阅我的编辑上面的一些JMS-API伪代码) –