RabbitMQ —— 一、几种 Exchange 模式
RabbitMQ —— 一、几种 Exchange 模式
RabbitMQ 里的 Exchange 提供了四种模式,或者叫它类型,它们是:fanout
,direct
,topic
和header
。其中前三种模式我们用的比较多。
一、Fanout Exchange
所有发送到 Fanout Exchange 的消息都会被转发到与该Exchange 绑定(Binding)的所有 Queue 上。
Fanout Exchange 不需要处理 RouteKey 。只需要简单的将队列绑定到 exchange 上。这样发送到 exchange 的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
所以,Fanout Exchange 转发消息是最快的。
- 1、可以理解为路由表的模式
- 2、这种模式不需要RouteKey
- 3、这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
- 4、如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
/**
* 生产者
*/
private static void ProducerMessage(MyMessage msg) {
var advancedBus = CreateAdvancedBus();
if (advancedBus.IsConnected) {
var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);
advancedBus.Publish(exchange, "", false, new Message<MyMessage>(msg));
} else {
Console.WriteLine("Can't connect");
}
}
/**
* 消费者
*/
private static void ConsumeMessage() {
var advancedBus = CreateAdvancedBus();
var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);
var queue = advancedBus.QueueDeclare("user.notice.wangwu");
advancedBus.Bind(exchange, queue, "user.notice.wangwu");
advancedBus.Consume(queue, registration => {
registration.Add<MyMessage>((message, info) => { Console.WriteLine("Body: {0}", message.Body); });
});
}
二、Direct Exchange
所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。
Direct 模式,可以使用 rabbitMQ 自带的 Exchange:default Exchange 。所以不需要将 Exchange 进行任何绑定(binding)操作 。消息传递时,RouteKey 必须完全匹配,才会被队列接收,否则该消息会被抛弃。
- 1、一般情况可以使用rabbitMQ自带的 Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。
- 2、这种模式下不需要将Exchange进行任何绑定(binding)操作
- 3、消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
- 4、如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
/**
* 生产者
*/
private static void ProducerMessage(MyMessage msg) {
var advancedBus = CreateAdvancedBus();
if (advancedBus.IsConnected) {
var queue = advancedBus.QueueDeclare("user.notice.zhangsan");
advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message<MyMessage>(msg));
} else {
Console.WriteLine("Can't connect");
}
}
/**
* 消费者
*/
private static void ConsumeMessage() {
var advancedBus = CreateAdvancedBus();
var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Direct);
var queue = advancedBus.QueueDeclare("user.notice.lisi");
advancedBus.Bind(exchange, queue, "user.notice.lisi");
advancedBus.Consume(queue, registration => {
registration.Add<MyMessage>((message, info) => {
Console.WriteLine("Body: {0}", message.Body);
});
});
}
三、Topic Exchange
所有发送到 Topic Exchange 的消息被转发到所有关心 RouteKey 中指定 Topic 的 Queue 上,
Exchange 将 RouteKey 和某 Topic 进行模糊匹配。此时队列需要绑定一个 Topic。可以使用通配符进行模糊匹配,符号 “#” 匹配一个或多个词,符号 “*” 匹配不多不少一个词。因此 “log.#” 能够匹配到 “log.info.oa”,但是 “log.*” 只会匹配到 “log.error”。
- 1、这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个 “标题”(RouteKey),Exchange 会将消息转发到所有关注主题能与 RouteKey 模糊匹配的队列。
- 2、这种模式需要 RouteKey,也许要提前绑定 Exchange 与 Queue。
- 3、在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及 log 的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
- 4、“#” 表示 0个或若干个关键字,“*” 表示一个关键字。如 “log.*”能与 “log.warn” 匹配,无法与 “log.warn.timeout” 匹配;但是 “log.#” 能与上述两者匹配。
- 5、同样,如果Exchange没有发现能够与 RouteKey 匹配的 Queue,则会抛弃此消息。
/**
* 生产者
*/
private static void ProducerMessage(MyMessage msg) {
// 创建消息 bus
IBus bus = CreateBus();
try {
bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
} catch (EasyNetQException ex) {
//处理连接消息服务器异常
}
bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
}
/**
* 消费者
*/
private static void ConsumeMessage(MyMessage msg) {
// 创建消息 bus
IBus bus = CreateBus();
try {
bus.Subscribe<MyMessage>(msg.MessageRouter, message => Console.WriteLine(msg.MessageBody), x => x.WithTopic("user.notice.#"));
} catch (EasyNetQException ex) {
//处理连接消息服务器异常
}
}
四、持久化
Exchange、queue 都可以设置为持久化,但消息最终是否被持久化,还要看 消息的持久化设置,这里的关键是:MessageProperties.PERSISTENT_TEXT_PLAIN channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
。