使用RabbitMQ简单发送接收消息
参考:http://blog.****.net/lmj623565791/article/details/37607165
http://blog.****.net/anzhsoft/article/details/19563091
http://www.rabbitmq.com/tutorials/tutorial-one-java.html
https://www.cnblogs.com/LiangSW/p/6218886.html
https://www.cnblogs.com/piaolingzxh/p/5448927.html
举个例子,假如你想投递一封邮件,你可以将邮件投递到某个邮箱,然后邮递员从邮箱中获取邮件,并将邮件交付到接收方,在这个过程中,RabbitMQ就 类似于邮箱 、邮局和邮递员,RabbitMQ是一个消息队列,它可以接收程序发送的消息,然后放入到相应的消息队列中,另外一些程序可以从消息队列中获取数据,以此完成程序之间的通信.
下面介绍一些概念:
1、生产者
程序中,发送消息的一方称为生产者
2、消息队列
用来存储生产者发送的消息
3、消费者
从消息队列中获取消息的一方称为消费者
4、Connection
Connection是一个TCP连接,消费者和生产者通过TCP连接到RabbitMQ server
5.Channel
channel建立在Conection之上,因为频繁建立关闭TCP连接会影响到性能,因此使用Channel发送、接收消息
(1)通过channel的queueDeclare()方法可以指定要发送消息/接收消息的队列
queue:队列名称
durable:是否持久化,队列和转发器都有durable属性,默认为false,此时如果rabbitmq重启,队列和转发器都会消失。设置为ture可以幸免于此。
exclusive:是否排外,为true时表示只可以在本次连接中(同一个连接)访问。
autoDelete:是否自动删除,即没有消费者时是否删除转发器
-
/**
-
* Declare an exchange.
-
* @see com.rabbitmq.client.AMQP.Exchange.Declare
-
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
-
* @param exchange the name of the exchange
-
* @param type the exchange type
-
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
-
* @param autoDelete true if the server should delete the exchange when it is no longer in use
-
* @param arguments other properties (construction arguments) for the exchange
-
* @return a declaration-confirm method to indicate the exchange was successfully declared
-
* @throws java.io.IOException if an error is encountered
-
*/
-
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
-
Map<String, Object> arguments) throws IOException
(2)通过channel的basicPublish()方法可以向队列中发送消息
exchange:转发器,可以传入空"",""是默认的转发器
routingKey:路由键,根据路由键转发器决定将消息发送到哪个消费者队列
body:消息的字节形式
其他参数请参考:https://www.cnblogs.com/piaolingzxh/p/5448927.html
-
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
-
this.delegate.basicPublish(exchange, routingKey, props, body);
-
}
-
public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException {
-
this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
-
}
-
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException {
-
this.delegate.basicPublish(exchange, routingKey, mandatory, immediate, props, body);
-
}
6.Exchange
转发器,生产者发送的消息一般不会直接交给消费者队列,而是用转发器作为中转,生产者首先将消息发送到转发器,转发器根据路由规则决定将消息发送到哪个队列。转发器类型有direct、topic、fanout和headers.
下面是一个简单的例子,Sender向消息队列中发送消息,Receiver从消息队列中获取消息,转发器使用空的转发器(默认转发器,direct类型),需要传递队列名称,转发器会把队列名称当做路由键,将消息发送到对应的队列中。
1.创建一个发送者类
-
package com.rabbit.test1;
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
/**
-
* RabbitMQ发送消息
-
* Sender
-
*/
-
public class Sender {
-
//队列名称
-
private final static String QUEUE_NAME = "queue-test";
-
public static void main(String[] argv) throws java.io.IOException {
-
//1.创建一个ConnectionFactory连接工厂connectionFactory
-
ConnectionFactory connectionFactory = new ConnectionFactory();
-
//2.通过connectionFactory设置RabbitMQ所在IP等信息
-
connectionFactory.setHost("localhost");
-
// connectionFactory.setPort(5762); //指定端口
-
// connectionFactory.setUsername("admin");//用户名
-
// connectionFactory.setPassword("admin");//密码
-
//3.通过connectionFactory创建一个连接connection
-
Connection connection = connectionFactory.newConnection();
-
//4.通过connection创建一个频道channel
-
Channel channel = connection.createChannel();
-
//5.通过channel指定一个队列
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
//发送的消息
-
String message = "hello world!";
-
//6.通过channel向队列中添加消息,第一个参数是转发器,使用空的转发器(默认的转发器,类型是direct)
-
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
-
System.out.println("向" + QUEUE_NAME + "中添加了一条消息:" + message);
-
//7.关闭频道
-
channel.close();
-
//8.关闭连接
-
connection.close();
-
}
-
}
2.接收者
-
package com.rabbit.test1;
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
import com.rabbitmq.client.QueueingConsumer;
-
import com.rabbitmq.client.QueueingConsumer.Delivery;
-
/**
-
* RabbitMQ接收消息
-
* Receiver
-
*
-
*/
-
public class Receiver {
-
//队列名称
-
private final static String QUEUE_NAME = "queue-test";
-
public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
-
//1.创建一个ConnectionFactory连接工厂connectionFactory
-
ConnectionFactory connectionFactory = new ConnectionFactory();
-
//2.通过connectionFactory设置RabbitMQ所在IP等信息
-
connectionFactory.setHost("localhost");
-
//3.通过connectionFactory创建一个连接connection
-
Connection connection = connectionFactory.newConnection();
-
//4.通过connection创建一个频道channel
-
Channel channel = connection.createChannel();
-
//5.通过channel指定队列
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
//与发送消息不同的地方
-
//6.创建一个消费者队列consumer,并指定channel
-
QueueingConsumer consumer = new QueueingConsumer(channel);
-
//7.为channel指定消费者
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
while (true) {
-
//从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
-
Delivery delivery = consumer.nextDelivery();
-
String message = new String(delivery.getBody());
-
System.out.println("接收到了" + QUEUE_NAME + "中的消息:" + message);
-
}
-
}
-
}
3.启动Sender,向消息队列中添加信息
访问RabbitMQ的管理界面,可以看到自动创建了一个queue-test的队列,并且有一条消息
启动Reciever,可以看到接收者从消息队列中获取了消息并打印在控制台