我们可以批量使用弹簧集成在蚊子10消息加载组
问题描述:
这是我如何定义我的mqtt连接使用春天integration.i我不知道这是否可能bt我们可以设置一个mqtt订户后得到10消息的加载。现在订阅者在发布消息之后就工作了。我们可以批量使用弹簧集成在蚊子10消息加载组
@Autowired
ConnectorConfig config;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(config.getUrl());
factory.setUserName(config.getUser());
factory.setPassword(config.getPass());
return factory;
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(config.getClientid(), mqttClientFactory(), "ALERT", "READING");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttRouterChannel());
return adapter;
}
/**this is router**/
@MessageEndpoint
public class MessageRouter {
private final Logger logger = LoggerFactory.getLogger(MessageRouter.class);
static final String ALERT = "ALERT";
static final String READING = "READING";
@Router(inputChannel = "mqttRouterChannel")
public String route(@Header("mqtt_topic") String topic){
String route = null;
switch (topic){
case ALERT:
logger.info("alert message received");
route = "alertTransformerChannel";
break;
case READING:
logger.info("reading message received");
route = "readingTransformerChannel";
break;
}
return route;
}
}
答
我需要批量达10条消息在同一时间
这不是一个MqttPahoMessageDrivenChannelAdapter
责任。由泛美卫生组织客户的性质
* @param topic name of the topic on the message was published to
* @param message the actual message.
* @throws Exception if a terminal error has occurred, and the client should be
* shut down.
*/
public void messageArrived(String topic, MqttMessage message) throws Exception;
所以,我们不能批他们有这个通道适配器:
我们有MqttCallback
使用这个语义。
我们可以从弹簧集成的角度来建议您的EIP实施。
在你的情况,你应该为AggregatorFactoryBean
@Bean
添加@ServiceActivator
是mqttRouterChannel
之前,发送到路由器之前。
,也许就这么简单:
@Bean
@ServiceActivator(inputChannel = "mqttAggregatorChannel")
AggregatorFactoryBean mqttAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
aggregator.setCorrelationStrategy(m -> 1);
aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(10));
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setGroupTimeoutExpression(new ValueExpression<>(1000));
aggregator.setOutputChannelName("mqttRouterChannel");
return aggregator;
}
看到Reference Manual的更多信息。
+0
感谢Aggregators听起来像是我的案例解决方案,仍然学习如何使用Aggregators。再次感谢。 – Priyamal
目前还不清楚你在这里问什么。你想让它忽略前10条消息吗?或一次批量处理10组消息? – hardillb
确实,不清楚。分别关闭问题。 –
我需要一次批量处理10条消息组 – Priyamal