rabbitmq--headers头路由模型

 

rabbitmq中有fanout、direct、topic 和headers交换机,前面3种我们都介绍过了,本篇我们介绍下headers交换机。

headers交换机是一种比较复杂且少见的交换机,不同于direct和topic,它不关心路由key是否匹配,而只关心header中的key-value对是否匹配(这里的匹配为精确匹配,包含键和值都必须匹配), 有点类似于http中的请求头。

headers头路由模型中,消息是根据prop即请求头中key-value来匹配的。

消费方指定的headers中必须包含一个"x-match"的键。

键"x-match"的值有2个:all和any。

all:表示消费方指定的所有key-value都必须在消息header中出现并匹配。

any:表示消费方指定的key-value至少有一个在消息header中出现并匹配即可。

 

示例代码:

生产者

package com.tingcream.rabbitmq.header;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class HeaderProducer {
	
	private static   String    EXCHANGE_NAME="my_headers_ex"; 
	
    public static void main(String[] args) throws Exception {
	  ConnectionFactory factory = new ConnectionFactory();
      
		//主机  端口  vhost 用户名 密码
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
		
		Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
		    
	   // channel.exchangeDeclare(EXCHANGE_NAME, "headers");
	   //  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
	    
	    Map<String, Object> header = new HashMap<String, Object>();
	    header.put("name", "张三");
	    header.put("idcard","123321"); 
	    header.put("phone","13567655555");
        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(header);

        String message = "Hello headers消息!";
        
       // channel.basicPublish(exchange, routingKey, props, body);
        channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
         System.out.println("headerProducer发送消息:"+message);
        channel.close();
        connection.close();
 
	     
	  
  }
}

消费者A

package com.tingcream.rabbitmq.header;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class HeaderConsumerA {
	
   private static   String    EXCHANGE_NAME="my_headers_ex"; 
   public static void main(String[] args)  throws Exception {
	  ConnectionFactory factory = new ConnectionFactory();
		//主机  端口  vhost 用户名 密码
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
		
		Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
		    
	  //  channel.exchangeDeclare(EXCHANGE_NAME, "headers");
	    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
	    
	    
	    String queueName = channel.queueDeclare().getQueue();
	    Map<String, Object> header = new HashMap<String, Object>();
	    header.put("x-match", "all");  //x-match: all表所有key-value全部匹配才匹配成功 ,any表只需要匹配任意一个key-value 即匹配成功。
	    header.put("name", "张三");
	    header.put("idcard","123321"); 
	    
	    channel.queueBind(queueName, EXCHANGE_NAME, "", header);
	    
	    Consumer consumer = new DefaultConsumer(channel){
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String message = new String(body, "UTF-8");
              System.out.println(message);
          }
      };
	    
      channel.basicConsume(queueName, true, consumer);
     
	
	  
  }

  
}

依次运行消费者、生产者程序。

生产者输出:

headerProducer发送消息:Hello headers消息!
消费者输出:

Hello headers消息!

rabbitmq--headers头路由模型登录管路后台,发现exchange中多出了一个my_headers_ex的交换机,类型为headers类型。