Spring Boot -- RabbitMQ
Spring Boot -- RabbitMQ
1. pom.xml
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. application.properties
#RabbitMQ
spring.rabbitmq.host=192.168.1.199
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=localhost
spring.rabbitmq.username=root
spring.rabbitmq.password=root
3. config
@Configuration
public class RabbitMQConfig {
@Bean
public Queue getQueue(){
return new Queue("queue.demo");
}
}
4. 生产者
@Component
public class Producer {
@Resource
private AmqpTemplate amqpTemplate;
public void send(){
String msg = "msg: " + new Date();
amqpTemplate.convertAndSend("queue.demo" ,msg);
System.out.println("Producer send "+ msg);
}
}
5. 消费者
@Component
@RabbitListener(queues = "queue.demo")
public class Consumer {
@RabbitHandler
public void accept(String msg){
System.out.println("Receiver = [" + msg + "]");
}
}
6. 测试
@Test
public void testMQ() throws InterruptedException {
producer.send();
Thread.sleep(20000);
}
发现对象消息
public void send(){
String msg = "msg: " + new Date();
Map<String, Object> map = new HashMap<>();
map.put("name", "xubo");
map.put("age", 12);
amqpTemplate.convertAndSend("queue.demo", map.toString());
System.out.println("Producer send "+ map.toString());
}
一个生产者多个消费者
消费者2
@Component
@RabbitListener(queues = "queue.demo")
public class Consumer2 {
@RabbitHandler
public void accept(String msg){
System.out.println("Consumer2 = [" + msg + "]");
}
}
生产者发送多个消息
public void send(){
for (int i = 0; i < 10; i++){
String msg = "msg: " + new Date();
amqpTemplate.convertAndSend("queue.demo", msg);
System.out.println("Producer send "+ msg);
}
}
一个生产者多个消费者属于队列模式,多个消费者瓜分消息
多个生产者多个消费者
生产者2
@Component
public class Producer2 {
@Resource
private AmqpTemplate amqpTemplate;
public void send(){
for (int i = 0; i < 5; i++){
String msg = "msg: " + i;
amqpTemplate.convertAndSend("queue.demo", msg);
System.out.println("Producer2 send "+ msg);
}
}
}
@Test
public void testMQ() throws InterruptedException {
producer.send();
producer2.send();
Thread.sleep(20000);
}
一个交换机绑定多个路由键
QueueConfig
@Configuration
public class QueueConfig {
@Bean(name = "masterQueue")
@Primary
public Queue masterQueue(){
return new Queue("queue.master");
}
@Bean(name = "clusterQueue")
public Queue clusterQueue(){
return new Queue("queue.cluster");
}
}
TopicRabbitConfig
@Configuration
public class TopicRabbitConfig {
@Bean
public TopicExchange exchange(){
return new TopicExchange("demo.amq.topic");
}
@Bean
public Binding masterBindingExchangeMessage(@Qualifier("masterQueue")Queue queue, TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("master");
}
@Bean
public Binding clusterBindingExchangeMessage(@Qualifier("clusterQueue")Queue queue, TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("cluster");
}
}
Producer
生产者发送消息需要指定交换机和路由键
public void send(){
for (int i = 0; i < 10; i++){
String msg = "msg: " + i;
if(i % 2 == 0){
amqpTemplate.convertAndSend("demo.amq.topic", "master", msg);
System.out.println("Producer to master send "+ msg);
}else {
amqpTemplate.convertAndSend("demo.amq.topic", "cluster", msg);
System.out.println("Producer to cluster send "+ msg);
}
}
}
Consumer
不同消费者要订阅对应的不同的队列
@Component
@RabbitListener(queues = "queue.master")
public class Consumer {
@RabbitHandler
public void accept(String msg){
System.out.println("Consumer1 master = [" + msg + "]");
}
}
@Component
@RabbitListener(queues = "queue.cluster")
public class Consumer2 {
@RabbitHandler
public void accept(String msg){
System.out.println("Consumer2 cluster = [" + msg + "]");
}
}
扇形交换机,广播模式,发布订阅模式
所有绑定扇形交换机的队列都会收到消息
FanoutRabbitConfig
@Configuration
public class FanoutRabbitConfig {
@Bean
public FanoutExchange exchange(){
return new FanoutExchange("demo.amq.fanout");
}
@Bean
public Binding masterBindingExchangeMessage(@Qualifier("masterQueue")Queue queue, FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding clusterBindingExchangeMessage(@Qualifier("clusterQueue")Queue queue, FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
Producer
生产者发送消息针对于扇形交换机不需要路由键
for (int i = 0; i < 10; i++){
String msg = "msg: " + i;
amqpTemplate.convertAndSend("demo.amq.fanout", "", msg);
System.out.println("Producer all send "+ msg);
}