ActiveMQ在发布-订阅模式下的实例

消息队列一般有两种模型

1.点对点模型(基于队列 Point to Point,PTP) 每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的 相关性.可以有多个发送者,但只能被一个消费者消费。 一个消息只能被一个接受者接受一次 生产者把消息发送到队列中(Queue),接受者无需订阅,当接受者未接受到消息时就会处于阻塞状态

2.  发布者/订阅者模型(基于主题的Publish/Subscribe,pub/sub) 每个消息可以有多个消费者。 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消 费自它订阅之后发布的消息. 允许多个接受者,类似于广播的方式 生产者将消息发送到主题上(Topic) 接受者必须先订阅  注:持久化订阅者:特殊的消费者,告诉主题,我一直订阅着,即使网络断开,消息服务器也记住所有持久化订阅者,如果有新消息,也会知道必定有人回来消费。

在SpringBoot中写的实例

连接ActiveMQ工具类:

 
/*
 * Copyright @ 2019 com.iflysse.trains
 * 01SpringBoot 下午2:18:30
 * All right reserved.
 *
 */
 
package com.dcx.comm.utils;
 
import java.util.List;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;
 
/**
 * @ClassName: ActiveMqUtils
 * @author: cxding
 * @createTime: 2019年4月26日 下午2:18:30
 * @version: v1.0
 */
@Component
public class ActiveMqUtils {
	/**
	 * 注入JMS
	 */
	@Autowired
	private JmsTemplate jmsTemplate;
 
 
	/**
	 * 设置普通
	 * @Title: sendNorMolMessage 
	 * @author: cxding
	 * @createTime: 2019年4月28日 下午1:03:56
	 * @param destination
	 * @param text void
	 */
	public <T> void sendNorMolMessage(Destination destination, String text) {
		// 连接工厂
		ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
		Connection connection = null;
		Session session = null;
		MessageProducer producer = null;
		try {
			// 创建链接
			connection = connectionFactory.createConnection();
			connection.start();
			// 创建session,开启事物
			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			// 创建生产者
			producer = session.createProducer(destination);
			// 设置持久化
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			// 设置过期时间
			//producer.setTimeToLive(time);
			TextMessage message = session.createTextMessage(text);
			producer.send(message);
			// 提交
			session.commit();
		} catch (JMSException e) {
			throw new RuntimeException(e);
		} finally {
			// 关闭连接
			close(producer, session, connection, connectionFactory);
		}
	}
 
}

连接信息配置application.properties

#-------------------------------activeMQ--------------------------------
# active mq
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=root
spring.activemq.password=pass
#默认是点对点
#default point to point
#现在需要订阅发布模式
spring.jms.pub-sub-domain=true

#默认是点对点
#default point to point
#现在需要订阅发布模式
spring.jms.pub-sub-domain=true

控制层

@PostMapping("/producertopic")
	@ApiOperation(value="产生消息队列(订阅发布模式,监听器目前只监听:“topicSend”)",response=Result.class)
	public Result<String> createTopicMq(@ApiParam(value = "队列目的地", required = false,defaultValue="topicSend") @RequestParam String topicName,
			@ApiParam(value = "队列消息内容", required = false) @RequestParam String text){
		Result<String> result = stuService.createTopicMq(topicName,text);
		return result;
	}

生产者:实现层

/**
	 * 发布主题产生消息队列
	 */
	@Override
	public Result<String> createTopicMq(String topicName, String text) {
		// TODO 发布-订阅模式--修改其中的Destination
		Destination destination = new ActiveMQTopic(topicName);
		for (int i = 0; i < 10; i++) {
			utils.sendNorMolMessage(destination, text+i);
		}
		return new Result<String>("主题消息已经产生",true);
	}

监听消费,设置两个订阅者

@Component
public class ActiveMqListenConfig {
	
	@Autowired
	private ActiveMqUtils util;
	private Logger logger = LoggerFactory.getLogger(this.getClass());
	
	@JmsListener(destination="topicSend")
	 public void recieveTopicTaskMq(String message) {
		logger.info("消费者1订阅收到的消息是:"+message+",并且这条消息已经被消费了");
	 }
	
	@JmsListener(destination="topicSend")
	 public void recieveTopicTaskMq2(String message) {
		logger.info("消费者2订阅收到到的消息是:"+message+",并且这条消息已经被消费了");
	 }

运行项目后

ActiveMQ在发布-订阅模式下的实例

查看消息

ActiveMQ在发布-订阅模式下的实例

控制台输出结果

 

ActiveMQ在发布-订阅模式下的实例

两个消费者分别消费了10条消息