消息中间件(RabbitMQ)04

一、rabbitmq实现了AMQP协议

      AMQP是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务。主要实现有 RabbitMQ。

二、AMQP包含的要素

    生产者:消息的创建者,发送到rabbitmq;

    消费者:连接到rabbitmq,订阅到队列上,消费消息,持续订阅(basicConsumer)和单条订阅(basicGet).

    消息:包含有效载荷和标签,有效载荷指要传输的数据,,标签描述了有效载荷,并且rabbitmq用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁。

    信道,概念:信道是生产消费者与rabbit通信的渠道,生产者publish或是消费者subscribe一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用

(为什么不建立多个TCP连接呢?原因是rabbit保证性能,系统为每个线程开辟一个TCP是非常消耗性能,每秒成百上千的建立销毁TCP会严重消耗系统。所以rabbitmq选择建立多个信道(建立在tcp的虚拟连接)连接到rabbit上。)

    交换器、队列、绑定、路由键 :

     队列通过路由键(routing  key,某种确定的规则)绑定到交换器,生产者将消息发布到交换器,交换器根据绑定的路由键将消息路由到特定队列,然后由订阅这个队列的消费者进行接收。

消息中间件(RabbitMQ)04

(详细的解释请查看:http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html

三、消息确认

        消费者收到的每一条消息都必须进行确认(自动确认和非自行确认,非自行确认下,rabbitmq会一直等待确认,除非忘了断开)。

        消费者在声明队列时,可以指定autoAck参数,当autoAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

        采用消息确认机制后,只要令autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

        当autoAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

        RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

四、交换器的类型:

    共有四种direct,fanout,topic,headers,其种headers(几乎和direct一样)不实用,可以忽略。

    消息中间件(RabbitMQ)04

直连型交换器(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换器用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

  • 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
  • 当一个携带着路由键为R的消息被发送给直连交换器时,交换器会把它路由给绑定值同样为R的队列。

直连交换器经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

直连型交换器图例:

 消息中间件(RabbitMQ)04

扇型交换器(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换器上,当有消息发送给此扇型交换器时,交换器会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换器处理消息的广播路由(broadcast routing)。

因为扇型交换器投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
  • 分发系统使用它来广播各种状态和配置更新
  • 在群聊的时候,它被用来分发消息给参与群聊的用户。

 扇型交换机图例:

消息中间件(RabbitMQ)04

主题交换器(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列 ,

通过使用“*”和“#”,使来自不同源头的消息到达同一个队列,”.”将路由键分为了几个标识符,“*”匹配1个,“#”匹配一个或多个。例如日志处理:

假设有交换器log-exchange,

日志级别有error,info,warning,

应用模块有user,order,email,

服务器有 A、B、C、D

路由键的规则为 服务器+“.”+日志级别+“.”+应用模块名,如:A. info .email。

1、要关注A服务器发送的所有应用错误的消息,怎么做?

声明队列名称为“a-app-error-queue”并绑定到交换器上:channel. queueBind (‘a-app-error-queue’,’logs-change’,’A.error.*’)

2、关注B服务器发送的的所有日志,怎么办?

声明队列名称为“b-all-queue”并绑定到交换器上:channel. queueBind (b-all-queue’,’logs-change’,’ B.#’)或channel. queueBind (b-all-queue’,’logs-change’,’ B.*.*’)

3、关注所有服务器发送的email的所有日志,怎么办?

声明队列名称为“email-all-queue”并绑定到交换器上:channel. queueBind (email -all-queue’,’logs-change’,’ *.*.emal’)

4、想要接收所有日志:channel->queue_bind(‘all-log’,’logs-change’,’#’)

 头交换器(headers exchange)有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换器(headers exchange)就是为此而生的。头交换器使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

我们可以绑定一个队列到头交换器上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。

头交换器可以视为直连交换器的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。

五、虚拟主机

虚拟消息服务器,vhost,本质上就是一个mini版的mq服务器,有自己的队列、交换器和绑定,最重要的,自己的权限机制。Vhost提供了逻辑上的分离,可以将众多客户端进行区分,又可以避免队列和交换器的命名冲突。Vhost必须在连接时指定,rabbitmq包含缺省vhost:“/”,通过缺省用户和口令guest进行访问。

rabbitmq里创建用户,必须要被指派给至少一个vhost,并且只能访问被指派内的队列、交换器和绑定。Vhost必须通过rabbitmq的管理控制工具创建。