springCloud-Stream 消息中间件的学习与使用(简单快速入门使用)

springCloud-Stream 消息中间件的学习与使用(简单快速入门使用)

1.发布与订阅的接口注册定义:
package com.weijs.demo.springcloudstream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface UserMessageEvent {
    //生产(发布者)
    String USER_PRODUCER = "USER_PRODUCER";
    //消费(订阅者)
    String USER_CONSUMER_1 = "USER_CONSUMER_1";//消费1
    String USER_CONSUMER_2 = "USER_CONSUMER_2";//消费2



    @Input(USER_CONSUMER_1)
    MessageChannel userConsumer1();
    @Input(USER_CONSUMER_2)
    SubscribableChannel userConsumer2();

    @Output(USER_PRODUCER)
    MessageChannel userProducer();

}
2.生产者(即发布):

调用接口发布消息

package com.weijs.demo.springcloudstream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RequestMapping("/user")
@RestController
public class UserProducer {

    @Autowired
    UserMessageEvent userMessageEvent;

    @RequestMapping(value = "/send")
    public void sendBrandAdd(@RequestParam String message, HttpServletRequest request) {
        log.info("========【接口接收MQ发送内容参数】" + message);
        Map<String, Object> map = new HashMap<>();
        map.put("routingKey", "add");//设置routingKey
        boolean result = userMessageEvent.userProducer().send(MessageBuilder.withPayload(message).copyHeaders(map).build());

//        //不需要设置routingKey
//        userMessageEvent.userProducer().send(MessageBuilder.withPayload(message).build());

        if (result) {
            log.info("=======send message seccess!");
        }
    }
}
3.消费者(即订阅者):

这里定义有个订阅者去消费消息,如果配置成功,这2个订阅者都会受到生产者发布的消息

package com.weijs.demo.springcloudstream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class UserConsumer {

    /**
     * 订阅者1
     * @param message
     */
    @StreamListener(UserMessageEvent.USER_CONSUMER_1)
    public void receive(Message<String> message) {
        log.info("订阅者1========消费消息:{}",message);
        log.info("订阅者1========消费消息payload:{}",message.getPayload());
        log.info("订阅者1========消费消息headers:{}",message.getHeaders());
    }

    /**
     * 订阅者2
     * @param message
     */
    @StreamListener(UserMessageEvent.USER_CONSUMER_2)
    public void receive2(Message<String> message) {
        log.info("订阅者2=========消费消息:{}",message.getPayload());
    }
}
4.配置文件进行对订阅发布的绑定:

如配置文件application.yml,这是队列使用是rabbitMQ,也可以换成kafka,所以配置mq的连接

  #rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  #cloud-stream
  cloud:
    stream:
      bindings:
        USER_PRODUCER: //这个就是接口UserMessageEvent定义的发布者
          content-type: application/json //传输内容的格式
          destination: userExchange //这个就是rabbitmq的交换机exchange
        USER_CONSUMER_1: //这个就是接口UserMessageEvent定义的订阅者1
          content-type: application/json
          destination: userExchange
        USER_CONSUMER_2: //这个就是接口UserMessageEvent定义的订阅者2
          content-type: application/json
          destination: userExchange
5.跑起来的测试:

springCloud-Stream 消息中间件的学习与使用(简单快速入门使用)