RocketMQ如何快速入门

RocketMQ如何快速入门,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

本章简单讲讲RocketMQ的入门操作,消息发送和消息接收。

引入 rocketmq-client

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.2.0</version>
</dependency>

编写Producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_test");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
            	//构建消息
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

查看结果

RocketMQ如何快速入门

编写Consumer

public static void main(String[] args){
		try {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
			consumer.setConsumerGroup("consumer_test_push");
			consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
			consumer.subscribe("TopicTest", "*");
			consumer.registerMessageListener(new MessageListenerConcurrently(){

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
						ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
					try {
					    for(MessageExt msg : paramList){
					    	String msgbody = new String(msg.getBody(), "utf-8");
					    	System.out.println("  MessageBody: "+ msgbody);//输出消息内容
					    }
					} catch (Exception e) {
					    e.printStackTrace();
					    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
				}
			});
			consumer.start();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

查看结果

RocketMQ如何快速入门

看到消费的结果大家可能有疑问,我们生产消息的时候是按照顺序生产的消息,消费时候为什么不是顺序消费下来的。

MQ消息的无序性,每个主题对应多个队列,生产消息时是根据算法放置不同的队列中,消费则就是无序了(有序消息后面讨论)

也有可能出现一条消息被消费了多次,RocketMQ的目标就是不丢数据,<u>每条消息至少发送一次</u>,内部通过ACK的确认机制实现的后面会重点讨论

消息管控台

为了方便的查看消息的详情我们可以通过消息的管控台更好的管理和查看消息详情,当然我们也可以通过后台的提供的命令来为运维提供更多的管理。

RocketMQ-Console地址: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

可以直接下载到本地之后通过mavne进行编译获取jar,该项目是SpringBoot项目

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

丢到linux服务器上启动

(1)启动时设置具体的RocketMQ的参数

java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.10.12.203:9876;10.10.12.204:9876

(2)直接修改rocketmq-console-ng-1.0.0.jar中的配置文件,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根据自己的NamesrvAddr进行修改rocketmq.config.namesrvAddr的值,默认端口12581

浏览器登录查看控制台信息

RocketMQ如何快速入门

查看RocketMQ集群的节点信息

RocketMQ如何快速入门

根据主题时间段查询消息

RocketMQ如何快速入门

查看某条消息的具体信息

RocketMQ如何快速入门

管控台提供了很多运维功能能极大的提高我们的运维效率,里面的功能包括创建主题、修改主题、发送消息、对消费者的信息进行查看等功能我们不一一介绍,可以简单的了解使用。

看完上述内容,你们掌握RocketMQ如何快速入门的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!