RabbitMQ学习笔记
一、开发环境搭建
1.安装Erlang环境
下载地址:OTP 18.3 Windows 64-bit Binary File
2.安装RabbitMQ服务端
下载地址:Windows
打开命令行工具,进入RabbitMQ目录下的sbin文件夹下,输入以下命令:
以服务的形式安装RabbitMQ
rabbitmq-service install
启动RabbitMQ服务
rabbitmq-service start
RabbitMQ所在的路径不能存在空格,否则会出现莫名其妙的错误。
3.下载RabbitMQ的客户端程序集。
下载地址:rabbitmq-dotnet-client-3.6.1-dotnet-4.5.zip
解压得到的RabbitMQ.Client.dll就是客户端的dll
4.启用管理界面工具
rabbitmq-plugins enable rabbitmq_management
在浏览器中输入地址:http://localhost:15672/#/可以访问管理页面
一、消息队列中消息的整体处理流程及名词解析
RabbitMQ处理消息流程图
二、Exchange三种常用的模式
Fanout模式
/// <summary> /// 生产者 /// </summary> class Program { static void Main(string[] args) { var factor = new ConnectionFactory() { Uri = "amqp://admin:[email protected]:5672/Foo" }; using (var connection = factor.CreateConnection()) { using (var channel = connection.CreateModel()) { var exchangeName = "exchange_Fanout"; channel.ExchangeDeclare(exchangeName, "fanout"); string message = "Hello,World"; channel.BasicPublish(exchangeName, "", null, Encoding.UTF8.GetBytes(message)); Console.WriteLine("消息{0}被发送", message); } } Console.ReadKey(); } }
/// <summary> /// 消费端 /// </summary> class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { Uri = "amqp://admin:[email protected]:5672/Foo" }; var connection = factory.CreateConnection(); using (var channel = connection.CreateModel()) { var queueName = channel.QueueDeclare().QueueName; channel.ExchangeDeclare(exchange: "exchange_Fanout", type: "fanout"); channel.QueueBind(queueName, "exchange_Fanout", ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body)); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.ReadLine();//阻断主线程,否则channel将会被释放 } Console.ReadKey(); } }
Direct模式
/// <summary> /// 生产者 /// </summary> class Program { static string queueName = "queue_direct"; static string exchangeName = "exchange_direct"; static string routingKey = "Hello"; static void Main(string[] args) { var factor = new ConnectionFactory() { Uri = "amqp://admin:[email protected]:5672/Foo" }; using (var connection = factor.CreateConnection()) { using (var channel = connection.CreateModel()) { //声明持久化队列 channel.QueueDeclare(queueName, true, false, false, null); //声明持久化转发器 channel.ExchangeDeclare(exchangeName, "direct", true, false, null); //绑定转发器和队列 channel.QueueBind(queueName, exchangeName, routingKey, null); var property = channel.CreateBasicProperties(); //消息持久化 property.DeliveryMode = 2; string message = "Hello,World"; channel.BasicPublish(exchangeName, routingKey, property, Encoding.UTF8.GetBytes(message)); Console.WriteLine("消息{0}被发送", message); } } Console.ReadKey(); } }
/// <summary> /// 消费端 /// </summary> class Program { static string queueName = "queue_direct"; static void Main(string[] args) { var factory = new ConnectionFactory() { Uri = "amqp://admin:[email protected]:5672/Foo" }; var connection = factory.CreateConnection(); using (var channel = connection.CreateModel()) { //声明持久化队列 channel.QueueDeclare(queueName, true, false, false, null); //设置最大服务转发消息数量 channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body)); //消息应答 channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.ReadLine(); } Console.ReadKey(); } }
Topic模式
/// <summary> /// 生产者 /// </summary> class Program { static string exchangeName = "exchange_topic"; static string queueName = "queue_topic"; static void Main(string[] args) { var factor = new ConnectionFactory() { Uri = "amqp://admin:[email protected]:5672/Foo" }; using (var connection = factor.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchangeName, "topic", true, false, null); channel.QueueDeclare(queueName, true, false, false, null); channel.QueueBind(queueName, exchangeName, "Hello.Topic.*"); var property = channel.CreateBasicProperties(); //消息持久化 property.DeliveryMode = 2; string message = "Hello,World"; channel.BasicPublish(exchangeName, "Hello.Topic.World", property, Encoding.UTF8.GetBytes(message)); Console.WriteLine("消息{0}被发送", message); } } Console.ReadKey(); } }
/// <summary> /// 消费端 /// </summary> class Program { static string queueName = "queue_topic"; static void Main(string[] args) { var factory = new ConnectionFactory() { Uri = "amqp://admin:[email protected]:5672/Foo" }; var connection = factory.CreateConnection(); using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, true, false, false, null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body)); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.ReadLine(); } Console.ReadKey(); } }