rocketMQ初体验
-
Producer
- 消息生产者,负责产生消息,一般由业务系统负责生产消息。
-
Producer Group:生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。
-
Consumer
- 消息消费者,负责消费消息,一般由后台系统异步消费消息。
- Consumer Group:消费者组,简单来说就是多个消费同一类消息的消费者称之为一个消费者组。在这里可以不用关心,只要知道有这么一个概念即可。
- Push Consumer:broker主动推送消息给consumer
-
Pull Consumer:consumer主动去broker拉取消息
-
NameServer
- 集群架构中的组织协调员
- 负责收集Broker的工作情况
-
不负责处理消息
-
Broker
- 负责消息的发送、接收、高可用等(真正干活的)
- 需要定时发送自身状况到NameServer(默认每10秒发送一次),超过2分钟未发送就会认为该Broker失效。
-
Topic
- 区分不同类型的消息,如user、order
- Message Queue
- 消息队列,储存消息
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>
</dependencies>
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("test-group");
// Specify name server addresses.
producer.setNamesrvAddr("172.16.55.185:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest11" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
}
}
测试若出现如下错
public class TopicDemo {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test");
//设置nameserver的地址
producer.setNamesrvAddr("172.16.55.185:9876");
// 启动生产者
producer.start();
/**
* 创建topic,参数分别是:broker的名称,topic的名称,queue的数量
*
*/
producer.createTopic("broker_test_im", "my-topic", 8);
System.out.println("topic创建成功!");
producer.shutdown();
}
}
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
//发送消息
String msg = "我的第一个消息6!";
Message message = new Message("my-topic", "delete", msg.getBytes("UTF-8"));
SendResult sendResult = producer.send(message);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
// 发送消息
String msg = "我的第一个异步发送消息!";
Message message = new Message("my-topic", msg.getBytes("UTF-8"));
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功了!" + sendResult);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
}
public void onException(Throwable e) {
System.out.println("消息发送失败!" + e);
}
});
// producer.shutdown();
}
}
2.4、消费消息
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅消息,接收的是所有消息
// consumer.subscribe("my-topic", "*");
consumer.subscribe("my-topic", "add || update");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println("消息:" + new String(msg.getBody(), "UTF-8"));
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("接收到消息 -> " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("haoke");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
//发送消息
String msg = "这是一个用户的消息, id = 1003";
Message message = new Message("my-topic-filter", "delete", msg.getBytes("UTF-8"));
message.putUserProperty("sex","男");
message.putUserProperty("age","20");
SendResult sendResult = producer.send(message);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
接收消息:
public class ConsumerFilter {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("haoke-consumer");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅消息,接收的是所有消息
// consumer.subscribe("my-topic", "*");
consumer.subscribe("my-topic-filter", MessageSelector.bySql("sex='女' AND age>=18"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println("消息:" + new String(msg.getBody(), "UTF-8"));
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("接收到消息 -> " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
测试,报错:
public class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_ORDER_PRODUCER");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
for (int i = 0; i < 100; i++) {
int orderId = i % 10; // 模拟生成订单id
String msgStr = "order --> " + i +", id = "+ orderId;
Message message = new Message("haoke_order_topic", "ORDER_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message, (mqs, msg, arg) -> {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();
}
}
public class OrderConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("HAOKE_ORDER_CONSUMER");
consumer.setNamesrvAddr("172.16.55.185:9876");
consumer.subscribe("haoke_order_topic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(Thread.currentThread().getName() + " "
+ msg.getQueueId() + " "
+ new String(msg.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
// System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new
TransactionMQProducer("transaction_producer");
producer.setNamesrvAddr("172.16.55.185:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
// 发送消息
Message message = new Message("pay_topic", "用户A给用户B转账500元".getBytes("UTF-8"));
producer.sendMessageInTransaction(message, null);
Thread.sleep(999999);
producer.shutdown();
}
}
public class TransactionListenerImpl implements TransactionListener {
private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();
/**
* 执行具体的业务逻辑
*
* @param msg 发送的消息对象
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
System.out.println("用户A账户减500元.");
Thread.sleep(500); //模拟调用服务
// System.out.println(1/0);
System.out.println("用户B账户加500元.");
Thread.sleep(800);
STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
// 二次提交确认
// return LocalTransactionState.UNKNOW;
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
e.printStackTrace();
}
STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
// 回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
/**
* 消息回查
*
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("状态回查 ---> " + msg.getTransactionId() +" " +STATE_MAP.get(msg.getTransactionId()) );
return STATE_MAP.get(msg.getTransactionId());
}
}
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("HAOKE_CONSUMER");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("pay_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}