springboot整合rabbitMQ
1. 创建两个maven项目,这是为了模拟分布式应用系统中,两个应用之间互相交流的过程,一个发送者(Sender),一个接收者(Receiver)
2. 两个项目的pom中添加rabbitmq的依赖
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.在application.properties配置mq的参数
spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=scan
spring.rabbitmq.password=scan
spring.rabbitmq.virtual-host=/ws
用户名,密码,host在管理界面可以配置[http://localhost:15672/#/vhosts]
4.在生产者中往队列发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class WsScanApplicationTests {
@Autowired
private RabbitTemplate amqpTemplate;
@Test
public void contextLoads() {
List<ScanData> aa = new ArrayList<>();
{
ScanData a = new ScanData();
a.setAPI_ID("API_ID1111111111");
a.setAPI_VER("API_VER11111");
a.setBAG_ID("BAG111111");
a.setBCH_ID("BRC_ID11111");
aa.add(a);
}
{
ScanData b = new ScanData();
b.setAPI_ID("API_ID22222");
b.setAPI_VER("API_VER2222222");
b.setBAG_ID("BAG222222222222");
b.setBCH_ID("BRC_ID22222222");
aa.add(b);
}
amqpTemplate.convertAndSend("ws_scan_data",aa);
}
}
4.在消费者中处理消息
@Component
public class WcForWsReceiver {
@Autowired
private AmqpTemplate amqpTemplate;
@RabbitListener(queues = "ws_scan_data")
public void process(byte[] data) {
System.out.println("Receiver : " +new String(data));
//自消费,一般拿到数据做业务处理
amqpTemplate.convertAndSend("ws_exchange","",data);
}
}
这是最简单的测试,默认是自动ack确认接收消息,还可以配置手动确认,
配置消费并发处理数量
@Configuration
public class RabbitMqConfig {
/** * 消费者数量,默认10 */
public static final int DEFAULT_CONCURRENT = 4;
/** * 每个消费者获取最大投递数量 默认50 */
public static final int DEFAULT_PREFETCH_COUNT = 50;
@Autowired
public ConnectionFactory connectionFactory;
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("ws_exchange");
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter());
factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
factory.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
}