RocketMQ入门-几种使用方式
参考官网http://rocketmq.apache.org/docs/quick-start/
无序消费在上一篇里写过,这里主要讲有序消费、广播消费、延时消费、批量消费
有序消费
RocketMQ的每个topic下会有多个queue,默认是4个,对于每个queue可以保证先进先出。
- 生产者角度
如果生产者直接发送消息到topic,消息可能会进入到任何一个队列导致无序,所以,对于需要顺序的消息,如一个用户连续3次付款操作,需要发到同一个队列中。 - 消费者角度
消费者在消费队列的时候需要按序消费,使用MessageListenerOrderly即可。
使用MessageListenerOrderly的消费者会定期去锁住topic的所有队列,保证只有它在消费,且在本地使用单线程操作来保证本地按序消费。
//生产者
//模拟100个用户
for(int userId=0;userId<100;userId++){
//每个用户连续3次付款
for(int payNum=0;payNum<3;payNum++){
Message msg = new Message();
msg.setTopic("payTopic");
msg.setBody(("用户" + userId + "第" + payNum + "次付款操作").getBytes());
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int index = (Integer)(arg) % mqs.size();
return mqs.get(index);
}
}, userId);
}
}
//消费者
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for(int i=0; i<msgs.size(); i++){
MessageExt msg = msgs.get(i);
System.out.println(msg.getTopic() + " " + msg.getTags() + " " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
广播消费
RocketMQ支持集群消费和广播消费。
- 集群消费
默认是集群消费模式,对于同一个消费组里的消费者,会分摊消息,如A消费了B就不能消费。
测试的时候发现,一个进程里无法创建一个消费组的两个消费者,会报错,必须分两个进程去创建,创建的时候groupName要一致,才会认为在同一个消费组。 - 广播消费
对于同一个消费组里的消费者,每个消费者都能收到每一份消息,AB能消费同一个消息,相当于广播。
测试的时候不小心在控制台将consumer的consumeBroadcastEnable设为false,导致无法消费集群消息,如果出现类似问题可以注意一下。
//groupName必须一致才认为在一个消费组里,同一个消费组里的消费者应该订阅一样的topic
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myGroupName");
//默认为集群模式,此处修改为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
延时消费
RocketMQ不支持精准的延时,只能在broker.conf中设置时延级别
messageDelayLevel = 1s 5s 10s
在发送消息的时候设置时延级别
//根据配置,1代表1s,2代表5s,3代表10s,如果设置为0代表没有时延
//如下发送则会在5s后收到延时消息
msg.setDelayTimeLevel(2);
producer.send(msg);
批量消费
RocketMQ支持批量消费,但是批量的消息的topic需要一样,tag则不限制。
List<Message> list = new ArrayList<>();
list.add(new Message("mytopic", "tag1", "body".getBytes()));
list.add(new Message("mytopic", "tag2", "body".getBytes()));
producer.send(list);