SpringBoot2.0集成MQTT功能之消息订阅处理

SpringBoot2.0集成MQTT消息之消息订阅处理

        第一,pom配置,引入相关jar:

  1. <dependency>
  2. <groupId>org.springframework.integration</groupId>
  3. <artifactId>spring-integration-stream</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.integration</groupId>
  7. <artifactId>spring-integration-mqtt</artifactId>
  8. </dependency>    

第二,配置MQTT服务器基本信息,在springBoot配置文件application.properties中配置,添加如下:

  1. #MQTT配置信息
  2. #MQTT-用户名
  3. spring.mqtt.username=admin
  4. #MQTT-密码
  5. spring.mqtt.password=password
  6. #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
  7. spring.mqtt.url=tcp://127.0.0.1:61613
  8. #MQTT-连接服务器默认客户端ID
  9. spring.mqtt.client.id=mqttId
  10. #MQTT-默认的消息推送主题,实际可在调用接口时指定
  11. spring.mqtt.default.topic=topic
  12. #连接超时
  13. spring.mqtt.completionTimeout=3000

第三,配置MQTT消息接收处理类:

  1. /**
  2. * 〈一句话功能简述〉<br>
  3. * 〈MQTT接收消息处理〉
  4. *
  5. * @author lenovo
  6. * @create 2018/6/4
  7. * @since 1.0.0
  8. */
  9. @Configuration
  10. @IntegrationComponentScan
  11. public class MqttReceiveConfig {
  12. @Value("${spring.mqtt.username}")
  13. private String username;
  14. @Value("${spring.mqtt.password}")
  15. private String password;
  16. @Value("${spring.mqtt.url}")
  17. private String hostUrl;
  18. @Value("${spring.mqtt.client.id}")
  19. private String clientId;
  20. @Value("${spring.mqtt.default.topic}")
  21. private String defaultTopic;
  22. @Value("${spring.mqtt.completionTimeout}")
  23. private int completionTimeout ; //连接超时
  24. @Bean
  25. public MqttConnectOptions getMqttConnectOptions(){
  26. MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
  27. mqttConnectOptions.setUserName(username);
  28. mqttConnectOptions.setPassword(password.toCharArray());
  29. mqttConnectOptions.setServerURIs(new String[]{hostUrl});
  30. mqttConnectOptions.setKeepAliveInterval(2);
  31. return mqttConnectOptions;
  32. }
  33. @Bean
  34. public MqttPahoClientFactory mqttClientFactory() {
  35. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  36. factory.setConnectionOptions(getMqttConnectOptions());
  37. return factory;
  38. }
  39. //接收通道
  40. @Bean
  41. public MessageChannel mqttInputChannel() {
  42. return new DirectChannel();
  43. }
  44. //配置client,监听的topic
  45. @Bean
  46. public MessageProducer inbound() {
  47. MqttPahoMessageDrivenChannelAdapter adapter =
  48. new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
  49. "hello","hello1");
  50. adapter.setCompletionTimeout(completionTimeout);
  51. adapter.setConverter(new DefaultPahoMessageConverter());
  52. adapter.setQos(1);
  53. adapter.setOutputChannel(mqttInputChannel());
  54. return adapter;
  55. }
  56. //通过通道获取数据
  57. @Bean
  58. @ServiceActivator(inputChannel = "mqttInputChannel")
  59. public MessageHandler handler() {
  60. return new MessageHandler() {
  61. @Override
  62. public void handleMessage(Message<?> message) throws MessagingException {
  63. String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
  64. String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
  65. if("hello".equalsIgnoreCase(topic)){
  66. System.out.println("hello,fuckXX,"+message.getPayload().toString());
  67. }else if("hello1".equalsIgnoreCase(topic)){
  68. System.out.println("hello1,fuckXX,"+message.getPayload().toString());
  69. }
  70. }
  71. };
  72. }
  73. }

第四,启动服务测试,使用postment调用上一篇的MQTT发送接口,分别往hello,hello1两个topic发送消息,测试接收情况:

SpringBoot2.0集成MQTT功能之消息订阅处理

SpringBoot2.0集成MQTT功能之消息订阅处理

        由此看出,可以正常监听topic并接收处理消息了。

        看到这里,朋友们可能有疑问,如果我要配置多个client,应该怎么处理呢?这个也简单,我们只要配置多个通道即可,简单代码如下:

  1. //通道2
  2. @Bean
  3. public MessageChannel mqttInputChannelTwo() {
  4. return new DirectChannel();
  5. }
  6. //配置client2,监听的topic:hell2,hello3
  7. @Bean
  8. public MessageProducer inbound1() {
  9. MqttPahoMessageDrivenChannelAdapter adapter =
  10. new MqttPahoMessageDrivenChannelAdapter(clientId+"_inboundTwo", mqttClientFactory(),
  11. "hello2","hello3");
  12. adapter.setCompletionTimeout(completionTimeout);
  13. adapter.setConverter(new DefaultPahoMessageConverter());
  14. adapter.setQos(1);
  15. adapter.setOutputChannel(mqttInputChannelTwo());
  16. return adapter;
  17. }
  18. //通过通道2获取数据
  19. @Bean
  20. @ServiceActivator(inputChannel = "mqttInputChannelTwo")
  21. public MessageHandler handlerTwo() {
  22. return new MessageHandler() {
  23. @Override
  24. public void handleMessage(Message<?> message) throws MessagingException {
  25. String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
  26. String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
  27. if("hello2".equalsIgnoreCase(topic)){
  28. System.out.println("hello2 clientTwo,"+message.getPayload().toString());
  29. }else if("hello3".equalsIgnoreCase(topic)){
  30. System.out.println("hello3 clientTwo,"+message.getPayload().toString());
  31. }
  32. }
  33. };
  34. }

        这样一来,我们就配置了两个client,client1监听处理hello、hello1主题消息,client2监听处理hello2、hello3主题,测试一下:

SpringBoot2.0集成MQTT功能之消息订阅处理

         从输出结果可以看出,我们发送不同的消息,分别由不同的client处理。所以,小伙伴,你理解了吗?

        【转载请注明出处——大道迷途】