RocketMQ入门-几种使用方式

参考官网http://rocketmq.apache.org/docs/quick-start/
无序消费在上一篇里写过,这里主要讲有序消费、广播消费、延时消费、批量消费

有序消费

RocketMQ的每个topic下会有多个queue,默认是4个,对于每个queue可以保证先进先出。

  • 生产者角度
    如果生产者直接发送消息到topic,消息可能会进入到任何一个队列导致无序,所以,对于需要顺序的消息,如一个用户连续3次付款操作,需要发到同一个队列中。
  • 消费者角度
    消费者在消费队列的时候需要按序消费,使用MessageListenerOrderly即可。
    使用MessageListenerOrderly的消费者会定期去锁住topic的所有队列,保证只有它在消费,且在本地使用单线程操作来保证本地按序消费。
//生产者
//模拟100个用户
for(int userId=0;userId<100;userId++){
	//每个用户连续3次付款
	for(int payNum=0;payNum<3;payNum++){
		Message msg = new Message();
		msg.setTopic("payTopic");
		msg.setBody(("用户" + userId + "第" + payNum + "次付款操作").getBytes());
		producer.send(msg, new MessageQueueSelector() {
			@Override
			public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
				int index = (Integer)(arg) % mqs.size();
				return mqs.get(index);
			}
		}, userId);
	}
}

//消费者
consumer.registerMessageListener(new MessageListenerOrderly() {		
	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
		for(int i=0; i<msgs.size(); i++){
			MessageExt msg = msgs.get(i);
			System.out.println(msg.getTopic() + " " + msg.getTags() + " " + new String(msg.getBody()));
		}
		return ConsumeOrderlyStatus.SUCCESS;
	}
});
consumer.start();

广播消费

RocketMQ支持集群消费和广播消费。

  • 集群消费
    默认是集群消费模式,对于同一个消费组里的消费者,会分摊消息,如A消费了B就不能消费。
    测试的时候发现,一个进程里无法创建一个消费组的两个消费者,会报错,必须分两个进程去创建,创建的时候groupName要一致,才会认为在同一个消费组。
  • 广播消费
    对于同一个消费组里的消费者,每个消费者都能收到每一份消息,AB能消费同一个消息,相当于广播。
    测试的时候不小心在控制台将consumer的consumeBroadcastEnable设为false,导致无法消费集群消息,如果出现类似问题可以注意一下。
    RocketMQ入门-几种使用方式
//groupName必须一致才认为在一个消费组里,同一个消费组里的消费者应该订阅一样的topic
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myGroupName");
//默认为集群模式,此处修改为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

延时消费

RocketMQ不支持精准的延时,只能在broker.conf中设置时延级别

messageDelayLevel = 1s 5s 10s

在发送消息的时候设置时延级别

//根据配置,1代表1s,2代表5s,3代表10s,如果设置为0代表没有时延
//如下发送则会在5s后收到延时消息
msg.setDelayTimeLevel(2);
producer.send(msg);

批量消费

RocketMQ支持批量消费,但是批量的消息的topic需要一样,tag则不限制。

List<Message> list = new ArrayList<>();
list.add(new Message("mytopic", "tag1", "body".getBytes()));
list.add(new Message("mytopic", "tag2", "body".getBytes()));
producer.send(list);