RocketMQ_04 RocketMQ入门之HelloWorld

RocketMQ_04 RocketMQ入门之HelloWorld

这是一个浮躁的社会,追求速成的浮躁风气是不可取的

一、准备工作

下载RocketMQ源码RocketMQ-master,选择其中的example模块,导入到IDEA中。导入完成后项目结构如下:
RocketMQ_04 RocketMQ入门之HelloWorld

二、进入quickstart包

该包下有两个java类,分别是生产者Producer和消费者Consumer类

生产者类

package com.alibaba.rocketmq.example.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;


/**
 * Producer,发送消息
 * 
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        /**
         * 第一步:创建DefaultMQProducer类并设定生产者名称,设置setNamesrv,集群模式用;隔开,调用start方法启动生产者。
         */
        DefaultMQProducer producer = new DefaultMQProducer("quickstart-producer-group");

        producer.setNamesrvAddr("192.168.0.121:9876;192.168.0.128:9876");
        producer.start();

        //发送10条消息
        for (int i = 0; i < 10; i++) {
            try {
                /**
                 * 第二步:使用Message类进行实例化消息,参数分别是:主题,标签,内容
                 */
                Message msg = new Message("TopicTest",//topic
                    "TagA",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );

                /**
                 * 第三步:调用send方法发送消息,并关闭生产者
                 */
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

消费者类

package com.alibaba.rocketmq.example.quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;


/**
 * Consumer,订阅消息
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        /**
         * 第一步:创建DefaultMQPushConsumer类并设定消费者名称,设置setNamesrvAddr,集群模式用;隔开
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart-consumer-group");

        consumer.setNamesrvAddr("192.168.0.121:9876;192.168.0.128:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 对头:CONSUME_FROM_FIRST_OFFSET
         * 队尾:CONSUME_FROM_LAST_OFFSET
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        /**
         * 第二步:设置DefaultMQPushConsumer实例的订阅主题,一个消费者对象可以订阅多个主题,使用subscribe方法订阅【参数 1 主题名称,参数2 标签内容】,可以使用'||'对标签内容进行合并获取
         */
        consumer.subscribe("TopicTest", "*");

        /**
         * 第三步: 消费者实例注册监听,设置registerMessageListener方法
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            /**
             * 第四步: 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接受数据
             * 返回值有ConsumeConcurrentlyStatus.CONSUME_SUCCESS
             *      ConsumeConcurrentlyStatus.RECONSUME_LATER
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /**
         * 第五步:启动消费者实例对象,调用start方法
         */
        consumer.start();

        System.out.println("Consumer Started.");
    }
}

一般是先启动消费者类,进行监听消息,然后启动生产者生产消息。

生产者产生消息:

12:19:39.686 [NettyClientSelector_1] DEBUG io.netty.util.internal.Cleaner0 - java.nio.ByteBuffer.cleaner(): available
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F0000000000000660, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=0], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F00000000000006E8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F0000000000000770, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F00000000000007F8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8007900002A9F0000000000000440, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b , queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8007900002A9F00000000000004C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b , queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8007900002A9F0000000000000550, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b , queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8007900002A9F00000000000005D8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b , queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F0000000000000880, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8008000002A9F0000000000000908, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a , queueId=1], queueOffset=5]

消费者消费消息:

12:19:33.967 [NettyClientSelector_1] DEBUG io.netty.util.internal.Cleaner0 - java.nio.ByteBuffer.cleaner(): available
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=4, sysFlag=0, bornTimestamp=1553573979753, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947101, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F0000000000000660, commitLogOffset=1632, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=4, sysFlag=0, bornTimestamp=1553573979764, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947107, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F00000000000006E8, commitLogOffset=1768, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979771, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947113, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F0000000000000770, commitLogOffset=1904, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979775, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947118, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F00000000000007F8, commitLogOffset=2040, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979782, bornHost=/192.168.0.107:52485, storeTimestamp=1553573947145, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F0000000000000440, commitLogOffset=1088, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979796, bornHost=/192.168.0.107:52485, storeTimestamp=1553573947149, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F00000000000004C8, commitLogOffset=1224, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=2, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979800, bornHost=/192.168.0.107:52485, storeTimestamp=1553573947152, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F0000000000000550, commitLogOffset=1360, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=3, storeSize=136, queueOffset=2, sysFlag=0, bornTimestamp=1553573979806, bornHost=/192.168.0.107:52485, storeTimestamp=1553573947159, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F00000000000005D8, commitLogOffset=1496, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=5, sysFlag=0, bornTimestamp=1553573979810, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947153, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F0000000000000880, commitLogOffset=2176, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_10 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=5, sysFlag=0, bornTimestamp=1553573979813, bornHost=/192.168.0.107:52484, storeTimestamp=1553573947155, storeHost=/192.168.0.128:10911, msgId=C0A8008000002A9F0000000000000908, commitLogOffset=2312, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, WAIT=true, TAGS=TagA}, body=16]]]

三、小结

Producer类

第一步:创建DefaultMQProducer类并设定生产者名称,设置setNamesrv,集群模式用;隔开,调用start方法启动生产者。

第二步:使用Message类进行实例化消息,参数分别是:主题,标签,内容

第三步:调用send方法发送消息,并关闭生产者

Consumer类

第一步:创建DefaultMQPushConsumer类并设定消费者名称,设置setNamesrvAddr,集群模式用;隔开

第二步:设置DefaultMQPushConsumer实例的订阅主题,一个消费者对象可以订阅多个主题,使用subscribe方法订阅【参数1主题名称,参数2标签内容】,可以使用’||'对标签内容进行合并获取

第三步: 消费者实例注册监听,设置registerMessageListener方法

第四步: 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接受数据

第五步:启动消费者实例对象,调用start方法