springboot整合rabbitMQ

1. 创建两个maven项目,这是为了模拟分布式应用系统中,两个应用之间互相交流的过程,一个发送者(Sender),一个接收者(Receiver)
2. 两个项目的pom中添加rabbitmq的依赖
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
3.在application.properties配置mq的参数
spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=scan
spring.rabbitmq.password=scan
spring.rabbitmq.virtual-host=/ws

用户名,密码,host在管理界面可以配置[http://localhost:15672/#/vhosts]
springboot整合rabbitMQ

4.在生产者中往队列发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class WsScanApplicationTests {
	@Autowired
	private RabbitTemplate amqpTemplate;

	@Test
	public void contextLoads() {
		List<ScanData> aa = new ArrayList<>();
		{
			ScanData a = new ScanData();
			a.setAPI_ID("API_ID1111111111");
			a.setAPI_VER("API_VER11111");
			a.setBAG_ID("BAG111111");
			a.setBCH_ID("BRC_ID11111");
			aa.add(a);
		}
		{
			ScanData b = new ScanData();
			b.setAPI_ID("API_ID22222");
			b.setAPI_VER("API_VER2222222");
			b.setBAG_ID("BAG222222222222");
			b.setBCH_ID("BRC_ID22222222");
			aa.add(b);
		}
		amqpTemplate.convertAndSend("ws_scan_data",aa);
	}
}
4.在消费者中处理消息
@Component
public class WcForWsReceiver {
    
    @Autowired
    private AmqpTemplate amqpTemplate;
    
    @RabbitListener(queues = "ws_scan_data")
    public void process(byte[] data) {
        System.out.println("Receiver  : " +new String(data));
        //自消费,一般拿到数据做业务处理
        amqpTemplate.convertAndSend("ws_exchange","",data);
    }

}

这是最简单的测试,默认是自动ack确认接收消息,还可以配置手动确认,
配置消费并发处理数量

@Configuration
public class RabbitMqConfig {

    /** * 消费者数量,默认10 */
    public static final int DEFAULT_CONCURRENT = 4;
    /** * 每个消费者获取最大投递数量 默认50 */
    public static final int DEFAULT_PREFETCH_COUNT = 50;

    @Autowired
    public ConnectionFactory connectionFactory;
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("ws_exchange");
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
 		factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
        factory.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);
        return factory;
    }
    @Bean
    public MessageConverter jsonMessageConverter() {
        return  new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

}
5.Exchange 模式

https://www.cnblogs.com/zhangweizhong/p/5713874.html