RocketMQ_04 RocketMQ入门之HelloWorld
RocketMQ_04 RocketMQ入门之HelloWorld
这是一个浮躁的社会,追求速成的浮躁风气是不可取的
一、准备工作
下载RocketMQ源码RocketMQ-master,选择其中的example模块,导入到IDEA中。导入完成后项目结构如下:
二、进入quickstart包
该包下有两个java类,分别是生产者Producer和消费者Consumer类
生产者类
package com.alibaba.rocketmq.example.quickstart;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
/**
* Producer,发送消息
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/**
* 第一步:创建DefaultMQProducer类并设定生产者名称,设置setNamesrv,集群模式用;隔开,调用start方法启动生产者。
*/
DefaultMQProducer producer = new DefaultMQProducer("quickstart-producer-group");
producer.setNamesrvAddr("192.168.0.121:9876;192.168.0.128:9876");
producer.start();
//发送10条消息
for (int i = 0; i < 10; i++) {
try {
/**
* 第二步:使用Message类进行实例化消息,参数分别是:主题,标签,内容
*/
Message msg = new Message("TopicTest",//topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes()// body
);
/**
* 第三步:调用send方法发送消息,并关闭生产者
*/
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
消费者类
package com.alibaba.rocketmq.example.quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/**
* 第一步:创建DefaultMQPushConsumer类并设定消费者名称,设置setNamesrvAddr,集群模式用;隔开
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart-consumer-group");
consumer.setNamesrvAddr("192.168.0.121:9876;192.168.0.128:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 对头:CONSUME_FROM_FIRST_OFFSET
* 队尾:CONSUME_FROM_LAST_OFFSET
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* 第二步:设置DefaultMQPushConsumer实例的订阅主题,一个消费者对象可以订阅多个主题,使用subscribe方法订阅【参数 1 主题名称,参数2 标签内容】,可以使用'||'对标签内容进行合并获取
*/
consumer.subscribe("TopicTest", "*");
/**
* 第三步: 消费者实例注册监听,设置registerMessageListener方法
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 第四步: 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接受数据
* 返回值有ConsumeConcurrentlyStatus.CONSUME_SUCCESS
* ConsumeConcurrentlyStatus.RECONSUME_LATER
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* 第五步:启动消费者实例对象,调用start方法
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
一般是先启动消费者类,进行监听消息,然后启动生产者生产消息。
生产者产生消息:
12:19:39.686 [NettyClientSelector_1] DEBUG io.netty.util.internal.Cleaner0 - java.nio.ByteBuffer.cleaner(): available
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F0000000000000660, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=0], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F00000000000006E8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F0000000000000770, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F00000000000007F8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8007900002A9F0000000000000440, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b , queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8007900002A9F00000000000004C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b , queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8007900002A9F0000000000000550, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b , queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8007900002A9F00000000000005D8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b , queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F0000000000000880, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F0000000000000908, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=1], queueOffset=5]
消费者消费消息:
12:19:33.967 [NettyClientSelector_1] DEBUG io.netty.util.internal.Cleaner0 - java.nio.ByteBuffer.cleaner(): available
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=4, sysFlag=0, bornTimestamp=1553573979753, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947101, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F0000000000000660, commitLogOffset=1632, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=4, sysFlag=0, bornTimestamp=1553573979764, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947107, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F00000000000006E8, commitLogOffset=1768, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979771, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947113, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F0000000000000770, commitLogOffset=1904, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979775, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947118, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F00000000000007F8, commitLogOffset=2040, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979782, bornHost=/192.168.0.107:52485, storeTimestamp=1553573947145, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F0000000000000440, commitLogOffset=1088, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979796, bornHost=/192.168.0.107:52485, storeTimestamp=1553573947149, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F00000000000004C8, commitLogOffset=1224, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=2, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979800, bornHost=/192.168.0.107:52485, storeTimestamp=1553573947152, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F0000000000000550, commitLogOffset=1360, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=3, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979806, bornHost=/192.168.0.107:52485, storeTimestamp=1553573947159, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F00000000000005D8, commitLogOffset=1496, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=5, sysFlag=0, bornTimestamp=1553573979810, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947153, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F0000000000000880, commitLogOffset=2176, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_10 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=5, sysFlag=0, bornTimestamp=1553573979813, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947155, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F0000000000000908, commitLogOffset=2312, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, WAIT=true, TAGS=TagA}, body=16]]]
三、小结
Producer类
第一步:创建DefaultMQProducer类并设定生产者名称,设置setNamesrv,集群模式用;隔开,调用start方法启动生产者。
第二步:使用Message类进行实例化消息,参数分别是:主题,标签,内容
第三步:调用send方法发送消息,并关闭生产者
Consumer类
第一步:创建DefaultMQPushConsumer类并设定消费者名称,设置setNamesrvAddr,集群模式用;隔开
第二步:设置DefaultMQPushConsumer实例的订阅主题,一个消费者对象可以订阅多个主题,使用subscribe方法订阅【参数1主题名称,参数2标签内容】,可以使用’||'对标签内容进行合并获取
第三步: 消费者实例注册监听,设置registerMessageListener方法
第四步: 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接受数据
第五步:启动消费者实例对象,调用start方法