RabbitMQ(二)confirm/return机制
程序用了1.5.3.RELEASE版本的spring-boot-starter-amqp依赖。
confirm确认机制
配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="rabbitConnectionFactory"
username="${rabbit_username}"
password="${rabbit_password}"
host="${rabbit_host}"
port="${rabbit_port}"
publisher-confirms="true"/>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory"
exchange="exchangeTest"
confirm-callback="publishService"
mandatory="true"/>
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"> </rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息接收者 -->
<bean id="confirmListener" class="com.example.hello2.controller.ConfirmListener"></bean>
<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual" >
<rabbit:listener queues="queueTest" ref="confirmListener" />
</rabbit:listener-container>
</beans>
重点是:
生产者:
@Service("publishService")
public class PublishService implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String exchange,String routingKey, Object message) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("correlationId:"+correlationId);
rabbitTemplate.convertAndSend(exchange, routingKey, message,correlationId);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationId:"+correlationData);
if (ack) {
System.out.println("消息成功发送到exchange");
System.out.println( correlationData.toString());
} else {
System.out.println("消息发送exchange失败:" + cause);
}
}
}
消费者:
@Service("confirmListener")
public class ConfirmListener implements MessageListener {
@Override
public void onMessage(Message message) {
String messageStr = new String(message.getBody());
System.out.println("消费者接收到信息 : " + messageStr);
}
}
发送正确的消息:
http://localhost:8081/test/send?exchange=exchangeTest&key=queueTestKey&message=hello
发送exchange正确,queue错误的请求:
http://localhost:8081/test/send?exchange=exchangeTest&key=queueTestKey2&message=hello
只有confirm()中收到回调,但是消费者并没有接收到
发送exchange错误的请求:
http://localhost:8081/test/send?exchange=exchangeTest2&key=queueTestKey&message=hello
说明了confirm机制是只保证了消息到达exchange,并不保证消息可以路由到正确的queue
return机制
mandatory和immediate是AMQP协议中basic.publish方法中的两个标识位,
它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。
-
mandatory
当标志位true:若交换机无法找到消息对应的队列,将会调用basic.return将消息返回给生产者。
当标志位false:消息直接被丢弃 -
immediate(参数已不被支持)
标志位为true:交换机将消息路由到队列,但是队列上没有消费者,调用basic.return将消息返回给生产者
标志位为false:消息被丢弃
mandatory的实现:
配置文件
发送者:
@Service("publishService")
public class PublishService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String exchange,String routingKey, Object message) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("correlationId:"+correlationId);
rabbitTemplate.convertAndSend(exchange, routingKey, message,correlationId);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationId:"+correlationData);
if (ack) {
System.out.println("消息成功发送到exchange");
System.out.println( correlationData.toString());
} else {
System.out.println("消息发送exchange失败:" + cause);
}
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println( " 消息没有成功发送到消费者队列");
}
}
发送exange正确,queue不正确的消息:
http://localhost:8081/test/send?exchange=exchangeTest&key=queueTestKey2&message=hello
结果: