rocketmq学习笔记 一 hello world

http://blog.****.net/akfly/article/details/53395289

早就听说过rocketmq,一直没时间去研究它

最近打算从hello world开始,,深入的学习rocketmq 


0.git下载源码本地编译


git地址  https://github.com/alibaba/RocketMQ.git


下载完成后执行mvn 命令 构建工程    


mvn -Dmaven.test.skip=true clean package install assembly:assembly -U


rocketmq学习笔记 一 hello world


1. 从target目录中,解压  alibaba-rocketmq-broker.tar.gz    启动nameServer 和 brokerServer 


tar -xzvf alibaba-rocketmq-broker.tar.gz 


./mqnamesrv &


./mqbroker -n 127.0.0.1:9876 &


rocketmq学习笔记 一 hello world



2.执行example中的例子  发送消息


product


  1. /** 
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more 
  3.  * contributor license agreements.  See the NOTICE file distributed with 
  4.  * this work for additional information regarding copyright ownership. 
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0 
  6.  * (the "License"); you may not use this file except in compliance with 
  7.  * the License.  You may obtain a copy of the License at 
  8.  * 
  9.  *     http://www.apache.org/licenses/LICENSE-2.0 
  10.  * 
  11.  *  Unless required by applicable law or agreed to in writing, software 
  12.  *  distributed under the License is distributed on an "AS IS" BASIS, 
  13.  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14.  *  See the License for the specific language governing permissions and 
  15.  *  limitations under the License. 
  16.  */  
  17. package com.alibaba.rocketmq.example.quickstart;  
  18.   
  19. import com.alibaba.rocketmq.client.exception.MQClientException;  
  20. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
  21. import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;  
  22. import com.alibaba.rocketmq.client.producer.LocalTransactionState;  
  23. import com.alibaba.rocketmq.client.producer.SendResult;  
  24. import com.alibaba.rocketmq.common.message.Message;  
  25. import com.alibaba.rocketmq.remoting.common.RemotingHelper;  
  26.   
  27. public class Producer {  
  28.     public static void main(String[] args) throws MQClientException, InterruptedException {  
  29.         DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");  
  30.         producer.setNamesrvAddr("127.0.0.1:9876");  
  31.         producer.start();  
  32.   
  33.         for (int i = 0; i < 100; i++) {  
  34.             try {  
  35.                 Message msg = new Message("TopicTest",// topic  
  36.                         "TagA",// tag  
  37.                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body  
  38.                 );  
  39.                 SendResult sendResult = producer.send(msg);  
  40.                 LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() {  
  41.   
  42.                     @Override  
  43.                     public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {  
  44.                         // TODO Auto-generated method stub  
  45.                         return null;  
  46.                     }  
  47.                 };  
  48.   
  49.                 //producer.sendMessageInTransaction(msg, tranExecuter, arg)  
  50.                 System.out.println(sendResult);  
  51.             } catch (Exception e) {  
  52.                 e.printStackTrace();  
  53.                 Thread.sleep(1000);  
  54.             }  
  55.         }  
  56.   
  57.         producer.shutdown();  
  58.     }  
  59. }  


3.查看broker中topic信息


mqadmin查看信息,topic已经存在了


rocketmq学习笔记 一 hello world


4.接着执行example consumer

  1. /** 
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more 
  3.  * contributor license agreements.  See the NOTICE file distributed with 
  4.  * this work for additional information regarding copyright ownership. 
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0 
  6.  * (the "License"); you may not use this file except in compliance with 
  7.  * the License.  You may obtain a copy of the License at 
  8.  * 
  9.  *     http://www.apache.org/licenses/LICENSE-2.0 
  10.  * 
  11.  *  Unless required by applicable law or agreed to in writing, software 
  12.  *  distributed under the License is distributed on an "AS IS" BASIS, 
  13.  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14.  *  See the License for the specific language governing permissions and 
  15.  *  limitations under the License. 
  16.  */  
  17. package com.alibaba.rocketmq.example.quickstart;  
  18.   
  19. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  20. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  21. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  22. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  23. import com.alibaba.rocketmq.client.exception.MQClientException;  
  24. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  25. import com.alibaba.rocketmq.common.message.MessageExt;  
  26.   
  27. import java.io.UnsupportedEncodingException;  
  28. import java.util.List;  
  29.   
  30. public class Consumer {  
  31.   
  32.     public static void main(String[] args) throws InterruptedException, MQClientException {  
  33.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
  34.   
  35.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  36.           
  37.         consumer.setNamesrvAddr("127.0.0.1:9876");  
  38.           
  39.         consumer.subscribe("TopicTest", "*");  
  40.   
  41.         consumer.registerMessageListener(new MessageListenerConcurrently() {  
  42.   
  43.             @Override  
  44.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  
  45.                                                             ConsumeConcurrentlyContext context) {  
  46.                 System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
  47.                 for(MessageExt me:msgs){  
  48.                     try {  
  49.                         System.out.println(new String(me.getBody(),"utf-8"));  
  50.                     } catch (UnsupportedEncodingException e) {  
  51.                         e.printStackTrace();  
  52.                     }  
  53.                 }  
  54.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  55.             }  
  56.         });  
  57.   
  58.         consumer.start();  
  59.   
  60.         System.out.println("Consumer Started.");  
  61.     }  
  62. }  


rocketmq学习笔记 一 hello world


ok,到这里,hello world 已经成功了