rabbitmq 第三章
由上一篇 《rabbitmq 第二章》 学习了rabbitmq的基本使用之 "hello world"
。
从官方的RabbitMQ Tutorials 得知还有四种模式,接下来开始学习 Work queues
(工作模式)。
由上图可知,有一个生产者发送消息到队列里面,然后有两个消费者接收消息,这里需要注意的一点就是,生成者发送消息到消息队列里面,消息会从两个消费者中找一个来接收消息(轮询分发)。
接下来开始编码,在第二种的代码基础上先新建一个package名字为work
编写生产者代码,具体如下(片段)
// 队列名字
private final static String WORK_QUEUE_NAME = "workdemo";
public static void main(String[] args) throws Exception {
// 获取连接
Connection mqConnection = MQConnectionUtil.getMQConnection();
// 创建通道
Channel channel = mqConnection.createChannel();
// 声明队列
channel.queueDeclare(WORK_QUEUE_NAME,false,false,false,null);
for (int i = 0; i < 100; i++) {
// 发送消息
channel.basicPublish("",WORK_QUEUE_NAME,null,("这是生产者发送的消息::"+i).getBytes());
}
// 关闭连接
channel.close();
mqConnection.close();
}
消费者1代码
private final static String WORK_QUEUE_NAME = "workdemo";
public static void main(String[] args) throws Exception{
// 获取连接
Connection mqConnection = MQConnectionUtil.getMQConnection();
// 创建通道
final Channel channel = mqConnection.createChannel();
// 声明队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 打印接收到的消息
System.out.println("消费者1接收到的消息是:::"+new String(body));
// 确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 注册消费者,并且为手动确认已收到消息
channel.basicConsume(WORK_QUEUE_NAME,false,consumer);
}
消费者2代码
private final static String WORK_QUEUE_NAME = "workdemo";
public static void main(String[] args) throws Exception{
// 获取连接
Connection mqConnection = MQConnectionUtil.getMQConnection();
// 创建通道
final Channel channel = mqConnection.createChannel();
// 声明队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 打印接收到的消息
System.out.println("消费者2接收到的消息是:::"+new String(body));
// 确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 注册消费者,并且为手动确认已收到消息
channel.basicConsume(WORK_QUEUE_NAME,false,consumer);
}
OK,代码编写完成启动 两个消费者,然后启动生产者。
由上可知,队列中的消息只有一个消费者接收到了。而且是比较均匀的收到了消息,消费者1和2 收到的消息都是一样的,那么有一种情况,有的机器处理快有的机器处理慢,此时怎么办呢?
模拟场景,可以在消费者1 sleep(100),消费者2 sleep(500)。观察控制台就可以看见sleep100的消费者早早的就结束了,而消费者2还在继续执行。针对这样问题,能不能让效率高的消费的更多消息,以便提高效率呢?
在官方的RabbitMQ Tutorials中,有这样的一段话:
In order to defeat that we can use the basic.qos method with the prefetch_count=1 setting.
This tells RabbitMQ not to give more than one message to a worker at a time.
Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one.
Instead, it will dispatch it to the next worker that is not still busy.
channel.basic_qos(prefetch_count=1)
看不懂这段话?没关系,意思就是有这么一个配置 channel.basic_qos(prefetch_count=1)
可以让效率高的机器处理更多的消息,这个配置分别配置在2个消费者的代码里面,具体如下:
再次运行消费者1和2,然后运行生产者,结果如下:
知识点小结
任务队列的消息分发机制分两种:轮询分发(Round-robin)和公平分发(Fair dispatch)。
1.轮询分发
使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
2.公平分发
我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。