RabbitMQ学习(一)
简单队列:
P:消息生产者
红色区域:消息队列
C:消息消费者
代码实操:
先写一个获取连接的工具类:
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException{
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//AMQP:5672
factory.setPort(5672);
//vhost:就相当于数据库
factory.setVirtualHost("/vhost_admin");
//设置用户信息
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
return connection;
}
}
定义消息生产者:
public class Send {
//队列名称
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello simple mq!";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("--send msg:"+msg);
channel.close();
connection.close();
}
}
定义消费者:
public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body);
System.out.println("consumer:"+msgString);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
测试:启动生产者和消费者,发现消费者成功收到消息
简单队列的不足:
耦合性太高,生产者一一对应消费者,如果这时想有多个消费者消费队列中的消息,就不行了。
Work queues工作队列:
一个生产者对应对个消费者。
代码实操:
定义生产者:
public class Send {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//
for(int i =0;i<50;i++){
String msg = "hello"+i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
定义两个消费者:
public class Recv1 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body);
System.out.println("consumer 1 :"+msgString);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("consumer 1 done");
}
}
};
//监听一个队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
public class Recv2 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body);
System.out.println("consumer 2 :"+msgString);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("consumer 2 done");
}
}
};
//监听一个队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
启动三个类:测试结果:
发现两个消费者都进行消息的消费。
通过控制台打印可以看出,两个消费者使用轮询分发的方式对消息进行消费,任务消息均分。
工作队列的不足:如果现在有两个消费者,有一个消费者A比较繁忙,一个消费者B比较清闲。如果使用轮询分发,结果导致A一直在处理消息。我们想要的结果是消费者B对请求处理的多一些,这就引入了basicQos(perfetch=1),这个就是用来限制RabbitMQ每次只发一个消息到消费者,消费者处理完告诉队列,队列才会继续发给消费者。这需要消费者需要手动反馈一个消息告诉队列。这种方式称为公平分发:
公平分发代码实现:
生产者:
只需要添加如下代码:
/*
* 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
*
* 限制发送给同一个消费者不得超过一条消息
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);
消费者:
//手动回执一个消息
channel.basicAck(envelope.getDeliveryTag(), false);
并将autoAck改为false。
测试结果:
消息应答
//自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
boolean autoAck = true;自动确认模式。一旦RabbitMQ将消息分发给消费者,就会从内存中删除;
这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息,
boolean autoAck = false;手动模式。如果有一个消费者挂掉,那么我们应该将这个消息再发送给其他消费者,因为RabbitMQ没有收到已经挂掉的消息的消息回执。RabbitMQ支持消息应答,消费者发送一个消息应答告诉RabbitMQ,这个消息我已经处理完成,可以删除了,然后RabbitMQ就会删除内存中的这个消息。
消息应答默认是打开的,即false。
但是如果RabbitMQ挂掉,任务仍然会丢失,这时就需要对消息进行持久化。
Publish/Subscribe模式:
订阅模式:一个生产者,多个消费者。每个消费者都有自己的队列,队列要绑定到交换机上。X:交换机。生产者将消息发送到交换机,交换机将消息推送到队列上。
代码实操:
生产者:
public class Send {
private static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //分发
//发送消息
String msg = "hello ps";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("Send:"+msg);
channel.close();
connection.close();
}
}
消费者:
public class Recv1 {
private static String QUEUE_NAME = "test_queue_fanout_email";
private static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.basicQos(1);//一次只分发一个
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body);
System.out.println("consumer 1 :"+msgString);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("consumer 1 done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
另一个消费者代码几乎一样,就不在贴了。
测试结果:
查看RabbitMQ控制台,发现交换机绑定了两个队列:
在这里先解释一下Exchange(交换机):
作用:接收生产者的消息,向队列推送消息
类型:
匿名转发:“”
fanout(不处理路由键):消息发到交换机,只要和交换机绑定的队列,都能收到消息。
Direct(处理路由键):发送消息的时候需要带一个路由的Key,队列也需要一个Key,如果绑定的队列与这个路由Key相匹配的话,交换机就会把消息转到这个队列中
Topic Exchange:
将路由键和某模式进行匹配。
#:匹配一个或多个
*:匹配一个
路由模式:
代码实操:
生产者:
public class Send {
private static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //分发
//发送消息
String msg = "hello routing";
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者1:设置Key为error
public class Recv1 {
private static String EXCHANGE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body);
System.out.println("consumer 1 :"+msgString);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("consumer 1 done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2:
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
如果生产者的key为error时,两个消费者都能接收到:
如果生产者的key为info或warning,只有第二个消费者能收到:
String routingKey = "info";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
Topic模式:
代码实操:
生产者:
public class Send {
private static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String msgString = "商品...";
channel.basicPublish(EXCHANGE_NAME, "goods.add", null, msgString.getBytes());
System.out.println("send:"+msgString);
channel.close();
connection.close();
}
}
消费者1:
public class Recv1 {
private static String EXCHANGE_NAME = "test_exchange_topic";
private static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body);
System.out.println("consumer 1 :"+msgString);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("consumer 1 done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2:
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
测试:
修改Send:
channel.basicPublish(EXCHANGE_NAME, "goods.del", null, msgString.getBytes());
消费者接收情况: