详解RabbitMq应用
详解rabbitmq 之exchange和routingkey、queue和死亡队列的关系
Exchange
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type,
boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException
exchange:交换机名称
type:交换机类型
durable:是否持久化,持久化后重启rabbitmq不会删除,需要配合autoDelete使用
autoDelete:是否自动删除,当这个true时,上面的不生效,没有连接后,exchange将会被删除
arguments:配置备用的exchange名称。If messages to this exchange cannot otherwise be routed, send them to the alternate exchange named here.(Sets the "alternate-exchange" argument.)
Rabbitmq Exchange Type 说明
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout
所有bind到此exchange的queue都可以接收消息,此时exchange会将收到的消息分发到所有与之绑定的queue上面,roukey在此不再生效。
direct
通过routingKey和exchange决定的那个唯一的queue可以接收消息,不接受通配
topic
所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等,注意,这里的通赔符的分隔是以点(.)为隔开的,匹配点隔开的字符,如有一个消息发送到routkey为test.rout.key的,则可以设置channel.queueBind(QUEUE_1, EXCHANGE, "test.rout.#"); channel.queueBind(QUEUE_2, EXCHANGE, "test.#.key");
channel.queueBind(QUEUE_1, EXCHANGE, "test.rout.*"); channel.queueBind(QUEUE_2, EXCHANGE, "test.*.key");
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
header
通过exchange和header来决定分发到那些queue,与routkey无关,匹配模式与header中的x-match有关,此变量只默认是all,可设置为any
消费者:
Channel channel = connection.createChannel();
Map<String, Object> headers = new HashMap<>();
headers.put("testH1","value1");
headers.put("testH3","value1");
headers.put("x-match", "all"); // arguments.put("x-match", "any");
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS, false, true,null);
channel.queueDeclare(QUEUE_1, false, false, true, null);
channel.queueBind(QUEUE_1, EXCHANGE, "gdfgdsf", headers);
生产者:
Channel channel = connection.createChannel();
Map<String, Object> headers= new HashMap<>();
headers.put("testH1","value3");
headers.put("testH2","value3");
AMQP.BasicProperties pros = new AMQP.BasicProperties().builder().headers(headers).build();
channel.basicPublish(EXCHANGE,"gsdfsddsfsdfsdf",pros,MESSAGE.getBytes("UTF-8"));
routing key 和binding key其实是一个东西,如果不一样,将不会路由到,配合exchange的类型使用。
Queue
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete,Map<String, Object> arguments) throws IOException
queue: 队列名称
durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景
autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
arguments:一个map集合,支持下列配置
Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties.build(), message.getBytes(“UTF-8”));
Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp,注意,这个配置后,队列在指定时间内没有被访问,会连同队列都被删除掉,不管是否设置了autoDelete为true或者是false,autoDelete失效
Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim
Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS, true, false,null);
channel.exchangeDeclare(EXCHANGE_2, BuiltinExchangeType.HEADERS, true, false,null);
Map<String,Object> argument = new HashMap<>();
argument.put("x-message-ttl", 1000 * 10);
//argument.put("x-max-length", 9); // 这个和上面设置过期时间,两种方式都可以生效
argument.put("x-dead-letter-exchange", EXCHANGE_2);
channel.queueDeclare(QUEUE_1, true, false, false, argument);
Map<String, Object> headers = new HashMap<>();
headers.put("testH1","value1");
headers.put("testH3","value1");
headers.put("x-match", "any");
channel.queueBind(QUEUE_1, EXCHANGE, "gdfgdsf", headers);
Map<String, Object> arguments = new HashMap<>();
arguments.put("testH1","value1");
arguments.put("testH2","value3");
AMQP.BasicProperties pros = new AMQP.BasicProperties().builder().headers(arguments).build();
channel.basicPublish(EXCHANGE, "gsdfsddsfsdfsdf", pros, MESSAGE.getBytes(Charset.forName("UTF-8")));
当x-dead-letter-exchange的type为FANOUT时,此时可以单独配置x-dead-letter-exchange即可生效,所有绑定到此x-dead-letter-exchange上面的queue都接收到相应的消息,当x-dead-letter-exchange为HEADER时,需要相应的header匹配才可以生效。除此之外,需要与下面的routing key相配合,才能够转发到相应的queue上面。
Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
需要配合x-dead-letter-exchange使用,不能单独使用x-dead-letter-routing-key
Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,
Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
Master locator(x-queue-master-locator)