Spring MqttPahoMessageDrivenChannelAdapter丢失连接:连接丢失;正在重试
我们正在使用Spring
message-driven-channel-adapter
订阅MQTT
的话题。但我们经常遇到错误。我已经使用JavaScript客户端(mqttws31.js)测试了连接,它工作正常。意味着没有连接问题。Spring MqttPahoMessageDrivenChannelAdapter丢失连接:连接丢失;正在重试
错误: -
org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter connectionLost
SEVERE: Lost connection:Connection lost; retrying...
MQTT消息: -
[payload=6483D03E4C75BA943148F18D73,1.00,1E, headers={mqtt_retained=false, mqtt_qos=0,
id=5fa41168-34c6-1e3d-a775-e3146842990a, mqtt_topic=TEST/GATEWAY2, mqtt_duplicate=false, timestamp=1499067757559}]
配置: -
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="userName" value="${mqtt.username}" />
<property name="password" value="${mqtt.password}" />
</bean>
<int-mqtt:message-driven-channel-adapter
id="mqttInbound" client-id="${mqtt.default.client.id}" url="${mqtt.url}"
topics="${topics}" client-factory="clientFactory" auto-startup="true"
channel="output" error-channel="errorChannel" />
<int:channel id="output" />
<int:channel id="errorChannel" />
<int:service-activator input-channel="errorChannel"
ref="errorMessageLogger" method="logError" />
<bean id="errorMessageLogger" class="com.mqtt.ErrorMessageLogger" />
<int:service-activator input-channel="output"
method="handleMessage" ref="mqttLogger" />
<bean id="mqttLogger" class="com.mqtt.MqttReciever" />
的pom.xml:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>4.2.2.RELEASE</version>
</dependency>
在调试org.eclipse.paho.client.mqttv3-1.1.1-sources.jar
: -
CommsReceiver.Java
public void run() {
final String methodName = "run";
MqttToken token = null;
while (running && (in != null)) {
try {
//@TRACE 852=network read message
log.fine(CLASS_NAME,methodName,"852");
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
// instanceof checks if message is null
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
// Ensure the notify processing is done under a lock on the token
// This ensures that the send processing can complete before the
// receive processing starts! (request and ack and ack processing
// can occur before request processing is complete if not!
clientState.notifyReceivedAck((MqttAck)message);
}
} else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
//This is an ack for a message we no longer have a ticket for.
//This probably means we already received this message and it's being send again
//because of timeouts, crashes, disconnects, restarts etc.
//It should be safe to ignore these unexpected messages.
log.fine(CLASS_NAME, methodName, "857");
} else {
// It its an ack and there is no token then something is not right.
// An ack should always have a token assoicated with it.
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
} else {
if (message != null) {
// A new message has arrived
clientState.notifyReceivedMsg(message);
}
}
}
catch (MqttException ex) {
//@TRACE 856=Stopping, MQttException
log.fine(CLASS_NAME,methodName,"856",null,ex);
running = false;
// Token maybe null but that is handled in shutdown
clientComms.shutdownConnection(token, ex);
}
catch (IOException ioe) {
//@TRACE 853=Stopping due to IOException
log.fine(CLASS_NAME,methodName,"853");
running = false;
// An EOFException could be raised if the broker processes the
// DISCONNECT and ends the socket before we complete. As such,
// only shutdown the connection if we're not already shutting down.
if (!clientComms.isDisconnecting()) {
clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
}
}
finally {
receiving = false;
}
}
//@TRACE 854=<
log.fine(CLASS_NAME,methodName,"854");
}
在上述方法中,有时in.readMqttWireMessage()
掷IOException
。所以根据catch块重新连接使用clientComms.shutdownConnection(token, ...
只是想万一分享它可以帮助... 我有同样的异常,并通过确保产生一个唯一的客户端ID(与MqttAsyncClient.generateClientId()
),这里所说固定它: https://github.com/eclipse/paho.mqtt.java/issues/207#issuecomment-338246879
感谢您的回答,我已经使用过相同的:) – HybrisFreelance
但是你仍然没有真正描述一个问题。你在上面显示一条消息,所以它必须为你工作。 Paho正在检测连接问题;它会通知将重新连接的Spring集成。
通过将ApplicationListener
添加到您的应用程序中,您可以获得有关该例外的完整信息。
@Bean
public ApplicationListener<?> eventListener() {
return new ApplicationListener<MqttConnectionFailedEvent>() {
@Override
public void onApplicationEvent(MqttConnectionFailedEvent event) {
event.getCause().printStackTrace();
}
};
}
结果:
Connection lost (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:164)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:116)
... 1 more
(当我关闭代理)。
如果您认为paho客户端存在问题,那么您应该为该项目提出问题。
'但你还没有真正描述一个问题' - 问题经常是“连接丢失;重试...”,这并不是我期待的。但现在它的工作正常。我已经使用了最新的mqtt(4.3.10.RELEASE)并且做了maven clean/install。现在它按预期工作。没有连接丢失的错误。谢谢你的时间。 – HybrisFreelance
你的问题不明确。如果连接丢失,则意味着连接出现问题,可能是由于网络错误。适配器将尝试重新连接。 –
请查找更新后的问题。我已经测试了与JavaScript客户端的连接,它工作正常。 – HybrisFreelance