springboot集成activeMQ
首先在http://www.apache.org/dyn/closer.cgi?path=/activemq/5.9.1/apache-activemq-5.9.1-bin.zip下载对应的activeMq包安装
apache-activemq-5.9.1/bin/win64/wrapper.exe运行后输入http://localhost:8161/ 输入账户密码进入默认账户密码admin/admin
进入如下界面:
1.首先在pom.xml中添加依赖:
<!-- 整合消息队列ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果配置线程池则加入 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
2. 在yml中配置参数:(这里我们打开点对点和订阅模式)
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
pool:
enable: true
max-connections: 100
jms:
pub-sub-domain: true #打开订阅模式
3. 写server类ActiveMqService.java
public interface ActiveMqService {
// user define queue
void sendMessage(Destination destination, String message);
// user default queue
void sendMessage(String message);
}
4. 写server类ActiveMqServiceImpl.java
@Service("ActiveMqService")
public class ActiveMqServiceImpl implements ActiveMqService {
@Autowired
private JmsMessagingTemplate jmsTemplate;
@Override
public void sendMessage(Destination destination,String message){
jmsTemplate.convertAndSend(destination, message);
}
@Override
public void sendMessage(String message){
jmsTemplate.convertAndSend(message);
}
}
5.写Junit测试
写基类避免重复导入注解
@RunWith(SpringRunner.class)
@SpringBootTest
public class WalletTests {
private Logger logger = LoggerFactory.getLogger(WalletTests.class);
@Before
public void init() {
logger.info("开始测试-----------------");
//System.out.println("开始测试-----------------");
}
@After
public void after() {
logger.info("测试结束-----------------");
//System.out.println("测试结束-----------------");
}
}
测试P2P模式
public class ActiveMqTest extends WalletTests {
@Autowired
public ActiveMqService activeMqService;
private Logger logger = LoggerFactory.getLogger(ActiveMqTest.class);
@Test //test p2p send
public void testCreateP2pQueue(){
Destination testD = new ActiveMQQueue("queueMq2");
activeMqService.sendMessage(testD,"testMq2");
activeMqService.sendMessage(testD,"testMq3");
}
@Test //test p2p receive
@JmsListener(destination="queueMq2")
public void receiveQueue(){
/* String text="";
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
try {
text = in.readLine();
in.close();
} catch (IOException e) {
e.printStackTrace();
}
logger.info("testMq收到的报文为:"+text);*/
logger.info("testMq收到的报文为:"+System.in);
}
public Session configSession(boolean commit) throws Exception{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(commit, Session.AUTO_ACKNOWLEDGE);
return session;
}
@Test //test p2p send
public void testProduceMsg() throws Exception{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("testActiveMQ");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("activeMQ+"+i));
}
session.commit();
producer.close();
session.close();
connection.close();
}
@Test //test p2p receive
public void testConsumeMsg() throws Exception{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("testActiveMQ");
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true) {
TextMessage message = (TextMessage) messageConsumer.receive(5000);
if (message != null) {
logger.info("testMq收到的报文为:"+message.getText());
} else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
public MessageListener monitorConsumer(String mqString){
MessageListener messageListener = new MessageListener() {
public void onMessage(Message message) {
TextMessage text = (TextMessage) message;
try {
logger.info(mqString + text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
};
return messageListener;
}
@Test //test p2p monitor receive
public void testConsumeMsgMonitor() throws Exception {
Session session = configSession(false);
Queue queue = session.createQueue("testActiveMQ");
String logHead = "testMq收到的报文为:";
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(monitorConsumer(logHead));
while(true){
}
}
@Test //test subscribe send
public void testCreateSubscribe(){
Destination testD = new ActiveMQTopic("topicMq");
activeMqService.sendMessage(testD,"topicMqtest1");
activeMqService.sendMessage(testD,"topicMqtest2");
}
@Test //test subscribe send
public void testCreateSubscribeMsg() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test_topic");
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("activeMQTopic+"+i));
}
session.commit();
producer.close();
session.close();
connection.close();
}
@Test //test subscribe monitor receive
@Deprecated
public void testSubscribeConsumeMsg() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test_topic");
MessageConsumer messageConsumer = session.createConsumer(topic);
while (true) {
TextMessage message = (TextMessage) messageConsumer.receive(5000);
if (message != null) {
logger.info("testMq收到的报文为:"+message.getText());
} else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
@Test //test subscribe monitor receive
public void testSubscribeConsumeMsgMonitor() throws Exception {
Session session = configSession(false);
Topic topic = session.createTopic("test_topic");
String logHead = "testMqTopic收到的报文为:";
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener(monitorConsumer(logHead));
while(true){}
}
}
运行testProduceMsg添加队列数据
获取队列数据运行testProduceMsg
此时在检查消息已经出列
监听测试运行testConsumeMsgMonitor后运行testProduceMsg。可以实时监听
开启监听
发送队列消息
自动接收
订阅模式
运行testSubscribeConsumeMsg监听
在运行testCreateSubscribeMsg发布消息
可以看到消息已被成功接收