rabbitmq--headers头路由模型
rabbitmq中有fanout、direct、topic 和headers交换机,前面3种我们都介绍过了,本篇我们介绍下headers交换机。
headers交换机是一种比较复杂且少见的交换机,不同于direct和topic,它不关心路由key是否匹配,而只关心header中的key-value对是否匹配(这里的匹配为精确匹配,包含键和值都必须匹配), 有点类似于http中的请求头。
headers头路由模型中,消息是根据prop即请求头中key-value来匹配的。
消费方指定的headers中必须包含一个"x-match"的键。
键"x-match"的值有2个:all和any。
all:表示消费方指定的所有key-value都必须在消息header中出现并匹配。
any:表示消费方指定的key-value至少有一个在消息header中出现并匹配即可。
示例代码:
生产者
package com.tingcream.rabbitmq.header;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class HeaderProducer {
private static String EXCHANGE_NAME="my_headers_ex";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主机 端口 vhost 用户名 密码
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
// channel.exchangeDeclare(EXCHANGE_NAME, "headers");
// channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
Map<String, Object> header = new HashMap<String, Object>();
header.put("name", "张三");
header.put("idcard","123321");
header.put("phone","13567655555");
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(header);
String message = "Hello headers消息!";
// channel.basicPublish(exchange, routingKey, props, body);
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
System.out.println("headerProducer发送消息:"+message);
channel.close();
connection.close();
}
}
消费者A
package com.tingcream.rabbitmq.header;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class HeaderConsumerA {
private static String EXCHANGE_NAME="my_headers_ex";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主机 端口 vhost 用户名 密码
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
// channel.exchangeDeclare(EXCHANGE_NAME, "headers");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
String queueName = channel.queueDeclare().getQueue();
Map<String, Object> header = new HashMap<String, Object>();
header.put("x-match", "all"); //x-match: all表所有key-value全部匹配才匹配成功 ,any表只需要匹配任意一个key-value 即匹配成功。
header.put("name", "张三");
header.put("idcard","123321");
channel.queueBind(queueName, EXCHANGE_NAME, "", header);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
依次运行消费者、生产者程序。
生产者输出:
headerProducer发送消息:Hello headers消息!
消费者输出:
Hello headers消息!
登录管路后台,发现exchange中多出了一个my_headers_ex的交换机,类型为headers类型。