RabbitMq的官方文档翻译(教程2)

场景2:单发送多接收

        使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。

RabbitMq的官方文档翻译(教程2)

准备

我们并没有像调整图片大小、渲染PDF这样真实的任务,所以我们将会发送一些字符串代表复杂的任务,通过使用Thread.sleep()来假装我们的业务系统很繁忙,小数点的数量代表这个任务的复杂性:每一个小数点代表一秒,比如,Hello...代表这个任务需要三秒。

我们稍微修改了一下上一篇的程序,来让他可以发送任意数量的消息,这个程序将会给我们的工作队列安排任务,所以我们给他命名为NewTash.java

RabbitMq的官方文档翻译(教程2)

RabbitMq的官方文档翻译(教程2)

我们之前的Recv.java也需要修改一下,他要根据消息体里的小数点数量来增加工作的秒数。他会传递消息并且执行他。改名为Worker.java

RabbitMq的官方文档翻译(教程2)

RabbitMq的官方文档翻译(教程2)

循环调度

使用任务队列的好处之一就是可以轻易的平行工作(即部署分布式worker),当面对积压了非常大量的消息的情况时,只需要增加worker的部署数量就可以轻易解决了。

我们尝试同时跑两个worker,即消息的接受者,他们会怎么样共同工作呢?

RabbitMq的官方文档翻译(教程2)

运行消息发送者,发送五条消息

RabbitMq的官方文档翻译(教程2)

消息接受者的情况如下:

RabbitMq的官方文档翻译(教程2)

worker1接收到了1、3、5条消息,worker2接受到了2、4条消息。

默认情况下,rabbitMq会按照顺序的方式给每个消费者发送消息,平均每个消费者都能得到相同数量的消息,这种方式叫做round-robin(循环??)。读者可以自行尝试三个甚至更多的消费者的情况。

消息答复

Worker接收到消息后,完成他的业务代码需要一些时间,你可能想知道在一个消费者接收到一个消息,然后执行业务代码到一部分的时候挂掉了会怎么办。根据我们现在的代码来说,rabbitMq把消息传递给消费者后就会把这些消息删除掉,在这种情况下,如果你干掉了一个worker,我们就会失去这个worker正在执行的,以及所有rabbitMq派发给他并且还没来得及执行的消息。

但是我们并不想失去任何的任务消息,如果一个worker挂掉了,我们想把这个worker头上的任务消息派发到其他的worker头上。

为了确保消息不会丢失,rabbitMq支持消息答复。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack给rabbitMq服务器告诉他我已经执行完成了,你可以把这条消息删除了。

如果一个消费者没有返回消息答复就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitMq就会明白,这个消息还没有被完成,rebbitMq就会重新把这条消息放入队列,如果在这个时间有其他的消费者在线,那么rabbitMq就会迅速的吧这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

就算执行一个消息用了非常长的时间,也不会有任何问题。

手动消息答复默认是开启的,前面的例子我们通过autoAck=ture把他们关闭了。我们现在要把它设置为false,然后从一个worker那里发送一个合适的答复。

RabbitMq的官方文档翻译(教程2)

这样编码的话,就算你用Ctrl+C杀掉一个正在处理消息的worker进程也不会丢失任何消息,worker挂掉之后,没有答复的消息就会被自动重新传递。

消息持久化

我们已经学到了如何确保就算消费者挂掉消息也不会丢失。但是如果我们的ribbitMq服务器停了的话,我们的消息任务仍然会丢失。

当rabbitMq服务器停止或者崩溃的时候,它就会忘掉所有的队列和消息,除非你告诉它不要这么做。。要确保消息不会丢失我们要做两件事:我们需要使队列和消息持久化。

首先,我们要确保rbbitMq不会丢失我们的队列,我们要做的是声明队列为可持久化的。

RabbitMq的官方文档翻译(教程2)

尽管命令是正确的,基于我们目前的设置他也不会生效。因为我们已经定义了一个叫hello的非持久化的队列。rabbitMq不会允许你用不同的参数重新定义一个已经存在的队列,如果你这么做,会返回一个错误。我们这有一个快速的变通方案-我们什么一个不同名字的队列。如task_queue:

RabbitMq的官方文档翻译(教程2)

这个队列声明的变化需要同时应用于生产者和消费者的代码。

这个时候我们就相当确定这个叫task_queue的队列就算rabbitMq重启也不会丢失了。现在我们需要通过设置MessageProperties的值为PERSISTENT_TEXT_PLAIN.把我们的消息标记成可持久化的。

RabbitMq的官方文档翻译(教程2)

注意:把消息标记成持久化的并不能完全保证消息不会丢失。尽管他告诉了rabbitMq要把这条消息保存到磁盘上,但是仍然有少数情况rabbitMq接收到消息还没来得及保存它。需要更强壮的保证机制 publisher confirms.

公平的分配机制

你可能注意到了现在的派发机制并没有像我们想象的那样工作。比如有两个worker的情况下,当所有奇数的消息都很重、偶数消息都很轻的时候,一个worker就会不断地繁忙工作,但是另一个就几乎不工作。但是这些对于rabbitMq服务器来说是不可知的。

这种情况是因为rabbitMq服务器并不去查看每个消费者未答复的消息的数量,它只是盲目的派发消息到消费者。

RabbitMq的官方文档翻译(教程2)

为了避免这种情况,我们可以使用basicQos方法,设置prefetchCount=1。这就告诉rabbitMq不要把多个消息同时派发给同一个worker。换句话说就是,在一个worker没有完成和答复前一个消息之前,不要给他派发新消息。相应的,它会把新消息派发给一个不忙的worker。

RabbitMq的官方文档翻译(教程2)

注意队列的大小:

当所有的worker都非常忙碌时,你的队列可能会被填满,所以你要注意你的队列大小,适当的增加更多的worker,或者使用其他的策略。

综合起来,代码如下。

RabbitMq的官方文档翻译(教程2)

 

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

 

翻译自:http://www.rabbitmq.com/tutorials/tutorial-two-java.html

参考:https://www.cnblogs.com/luxiaoxun/p/3918054.html

 

 

转载于:https://my.oschina.net/katkrazy/blog/1564100