RocketMQ下载安装及基本使用

环境说明:在虚拟机下,在Linux redhat6.4系统下,安装RocketMQ软件。

目录
一、    下载 
二、    安装
1.    解压后重命名为ROCKETMQ
2.    创建定制化文件夹
三、    软件运行基础配置
1.    修改软件运行内存
2.    XML文件路径修改
3.    开通指定端口的防火墙
四、    配置单节点ROCKETMQ服务
1.    修改服务配置
1.1    修改hosts
1.2    修改broker.conf
2.    启动 NAMESRV服务
3.    启动BROKER
4.    查看服务器启动日志
5.    关闭ROCKETMQ服务
五、    测试单节点的ROCKETMQ代码
1.    POM.XML文件
2.    生产者代码
3.    消费者代码
六、    页面可视化操作ROCKETMQ项目
1.    代码下载
2.    代码调整
3.    启动项目
4.    访问项目
七、    配置ROCKETMQ两主和两从集群:(机器不够,未尝试)
八、    查看TOPIC相关信息和状态
九、    访问ROCKETMQ错误问题列举 

 

一、下载

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip

二、安装

1. 解压后重命名为rocketmq

mv 原名称 修改后的名称

RocketMQ下载安装及基本使用

2.创建定制化文件夹
在rocketmq文件夹下创建logs,store文件夹;
在store下创建commitlog和consumequeue,index文件夹。
logs:存储RocketMQ日志目录
store:存储RocketMQ数据文件目录
commitlog:存储RocketMQ消息信息
consumequeue,index:存储消息的索引数据

RocketMQ下载安装及基本使用

RocketMQ下载安装及基本使用

三、软件运行基础配置
1.修改软件运行内存
/usr/local/rocketmq/bin
vi runserver.sh

原:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn8g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

修改后:JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

vi runbroker.sh

原:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn8g"

修改后:JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g"

2.xml文件路径修改
cd /usr/local/rocketmq/conf
sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

把该文件夹下xml文件,里的{user.home}替换成/usr/local/rocketmq

RocketMQ下载安装及基本使用

原:{user.home}

替换后:

RocketMQ下载安装及基本使用

 

3.开通指定端口的防火墙
端口:9876,61306,10911,10909

vim /etc/sysconfig/iptables

RocketMQ下载安装及基本使用

修改保存后,重启防火墙:

service iptables restart

四、配置单节点RocketMQ服务
1.修改服务配置
1.1修改hosts

vi /etc/hosts

增加一条记录

192.168.0.102  rocketmq-nameserver1

1.2修改broker.conf

namesrvAddr=rocketmq-nameserver1:9876

brokerClusterName = DefaultCluster

brokerName = broker-a

brokerIP1=192.168.0.102

brokerId = 0

deleteWhen = 04

fileReservedTime = 48

brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

2.启动 namesrv服务
在bin目录下:
nohup sh mqnamesrv &
查看启动的进程(jps全局变量)

jps

RocketMQ下载安装及基本使用

RocketMQ下载安装及基本使用

3.启动broker

nohup sh ./bin/mqbroker -n 192.168.0.102:9876 -c conf/broker.conf autoCreateTopicEnable=true &

RocketMQ下载安装及基本使用

4.查看服务器启动日志
查看namesrv服务日志

tail -f /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log -n 50

查看broker服务日志

tail -f /usr/local/rocketmq/logs/rocketmqlogs/broker.log -n 50

RocketMQ下载安装及基本使用

5.关闭rocketmq服务
关闭broker服务 :sh bin/mqshutdown broker

关闭namesrv服务:sh bin/mqshutdown namesrv

RocketMQ下载安装及基本使用

五、测试单节点的RocketMQ代码
1.pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.cheng</groupId>

  <artifactId>rocketMQTest</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <packaging>war</packaging>

 

  <dependencies>

  <dependency>

            <groupId>javax.servlet</groupId>

            <artifactId>servlet-api</artifactId>

            <version>2.5</version>

            <scope>provided</scope>

        </dependency>

 

        <dependency>

            <groupId>javax.servlet</groupId>

            <artifactId>jsp-api</artifactId>

            <version>2.0</version>

            <scope>provided</scope>

        </dependency>

       

        <dependency>

            <groupId>org.apache.rocketmq</groupId>

            <artifactId>rocketmq-client</artifactId>

            <version>4.5.2</version>

        </dependency>

 

  </dependencies>

 

  <build>

        <plugins>

            <!-- JDK选版本 -->

            <plugin>

                <artifactId>maven-compiler-plugin</artifactId>

                <configuration>

                    <source>1.8</source>

                    <target>1.8</target>

                    <encoding>UTF-8</encoding>

                </configuration>

            </plugin>

            <plugin>

                <artifactId>maven-war-plugin</artifactId>

                <version>2.6</version>

                <configuration>

                    <warSourceDirectory>src/main/webapp</warSourceDirectory>

                    <failOnMissingWebXml>false</failOnMissingWebXml>

                    <overlays>

                    </overlays>

                </configuration>

            </plugin>

        </plugins>

    </build>

 

 

</project>

2.生产者代码

package com.cheng;

 

import java.text.SimpleDateFormat;

import java.util.Date;

 

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

 

public class RocketProducer {

     public static void main(String[] args) throws InterruptedException {

         // 需要一个producer group名字作为构造方法的参数,这里为producer1

         DefaultMQProducer producer = new DefaultMQProducer("SELF_TEST_P_GROUP");

 

         // 设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔

         // NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里

         producer.setNamesrvAddr("192.168.0.102:9876");

          producer.setInstanceName(String.valueOf(System.currentTimeMillis()));

         // 解决rocketmq生产者发送消息超时

         producer.setSendMsgTimeout(15000); // 毫秒单位,重要的点!!!!!!!!!!!

 

         // 为避免程序启动的时候报错,添加此代码,可以让rocketMq自动创建topickey

        

         try {

              producer.start();

         } catch (MQClientException e1) {

              e1.printStackTrace();

              System.out.println("start失败:" + e1.toString());

         }

        

         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:MM:sss");

 

         try {

              Message message = new Message("threezto-test", "tag", "key",

                       ("美丽中国-2").getBytes(RemotingHelper.DEFAULT_CHARSET));

              System.out.println("开始发送:" + sdf.format(new Date()));

              SendResult sendResult = producer.send(message);

              System.out.println("发送完成:" + sdf.format(new Date()));

 

              System.out.println("发送的消息ID:" + sendResult.getMsgId() + "--- 发送消息的状态:" + sendResult.getSendStatus());

         } catch (Exception e) {

              e.printStackTrace();

         }

 

         producer.shutdown();

     }

}

运行效果

开始发送:2019-50-27 09:10:029

发送完成:2019-50-27 09:10:034

发送的消息ID:C0A800648C134E25154F8A9536F10000--- 发送消息的状态:SEND_OK

3.消费者代码

package com.cheng;

 

import java.util.List;

 

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

 

public class RocketConsumer {

     public static void main(String[] args) throws MQClientException {

         //设置消费者组

         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");

 

         consumer.setVipChannelEnabled(false);

         String ADDR = "192.168.0.102:9876";

         consumer.setNamesrvAddr(ADDR);

         //设置消费者端消息拉取策略,表示从哪里开始消费

         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 

         //设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤

         consumer.subscribe("threezto-test", "*");

 

         //消费者端启动消息监听,一旦生产者发送消息被监听到,就打印消息,和rabbitmq中的handlerDelivery类似

         consumer.registerMessageListener(new MessageListenerConcurrently() {

 

             @Override

             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                 for (MessageExt messageExt : msgs) {

                     String topic = messageExt.getTopic();

                     String tag = messageExt.getTags();

                     String msg = new String(messageExt.getBody());

                     System.out.println("*********************************");

                     System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + msg + ", tag:" + tag + ", topic:" + topic);

                     System.out.println("*********************************");

                 }

 

                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

             }

         });

 

         //调用start()方法启动consumer

         consumer.start();

         System.out.println("Consumer Started....");

}

}

运行效果

Consumer Started....

*********************************

消费响应:msgId : C0A800647A6F4E25154F8A862A2C0000,  msgBody : 美丽中国, tag:tag, topic:threezto-test

*********************************

*********************************

消费响应:msgId : C0A800648C134E25154F8A9536F10000,  msgBody : 美丽中国-2, tag:tag, topic:threezto-test

*********************************

六、页面可视化操作RocketMQ项目
1.代码下载

https://github.com/apache/rocketmq-externals/tree/master

2.代码调整

项目:rocketmq-console-ng

Pom.xml部分jar导入失败,pom里就注释掉

maven-checkstyle-plugin

maven-surefire-plugin

3.启动项目
只开启rocketmq-console-ng

项目是springboot微服务启动

RocketMQ下载安装及基本使用

4.访问项目

http://localhost:8080/#/

RocketMQ下载安装及基本使用

RocketMQ下载安装及基本使用

七、配置RocketMQ两主和两从集群:(机器不够,未尝试)
配置节点:
cd /usr/local/rocketmq/conf/2m-2s-async
vi broker-a.properties

八、查看topic相关信息和状态

查看topic列表

在bin目录下

./mqadmin topicList -n 192.168.0.102:9876

查看topic的状态

bin/mqadmin topicStatus -n 192.168.0.102:9876 -t SELF_TEST_TOPIC

RocketMQ下载安装及基本使用

查看所有消费组group

sh mqadmin consumerProgress -n 192.168.0.102:9876

RocketMQ下载安装及基本使用

查看topic信息列表详情统计

sh mqadmin topicstatus -n 192.168.0.102:9876 -t threezto-test

RocketMQ下载安装及基本使用

控制台创建topic

集群:DefaultCluster

Topic name:threezto-test -r

sh mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t topic名字 -r 12 -w 12

sh mqadmin updateTopic -c rocketmq-cluster -n 192.168.0.102:9876 -t topic-test 12 -w 12

 

九、访问RocketMQ错误问题列举

生产者访问的topic,没有注册

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).

RocketMQLog:WARN Please initialize the logger system properly.

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest

See http://rocketmq.apache.org/docs/faq/ for further details.

at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:662)

at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310)

at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256)

at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339)

     at com.rocket.RocketProducer.main(RocketProducer.java:32)

生产者使用的topic是系统自动生成的,不能使用。

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).

RocketMQLog:WARN Please initialize the logger system properly.

org.apache.rocketmq.client.exception.MQClientException: The topic[TBW102] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.

For more information, please visit the url, http://rocketmq.apache.org/docs/faq/

at org.apache.rocketmq.client.Validators.checkTopic(Validators.java:124)

at org.apache.rocketmq.client.Validators.checkMessage(Validators.java:86)

at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:337)

     at com.rocket.RocketProducer.main(RocketProducer.java:32)

RocketMQ服务器没有开放10911端口

org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [5375]ms, Topic: threezto-test, BrokersSent: [broker-a, broker-a, broker-a]

See http://rocketmq.apache.org/docs/faq/ for further details.

at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:638)

at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310)

at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256)

at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339)

at com.rocket.RocketProducer.main(RocketProducer.java:39)

Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.102:10911> failed

at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:392)

at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:465)

at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:449)

at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:403)

at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:831)

at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:557)

     ... 4 more