spring boot整合ActiveMQ实现

一、安装ActiveMQ

注意:JDK版本需要1.7及以上才行

到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/download.html,解压后的目录结构如下:

spring boot整合ActiveMQ实现

如果我们是32位的机器,就双击win32目录下的activemq.bat,如果是64位机器,则双击win64目录下的activemq.bat,运行结果如下:

spring boot整合ActiveMQ实现

启动成功!成功之后在浏览器输入http://127.0.0.1:8161/地址,可以看到ActiveMQ的管理页面,用户名和密码默认都是admin,如下:spring boot整合ActiveMQ实现

二. 整合步骤

2.1  pom.xml

 

<!-- activemq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <!-- activemq -->

2.2 application.yml

 

  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    # 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
    in-memory: true
    # 等待消息发送响应的时间。设置为0等待永远。
    send-timeout: 0
    # 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
    non-blocking-redelivery: false
    # 是否用Pooledconnectionfactory代替普通的ConnectionFactory
    pool:
      enabled: true
    packages:
      trust-all: true   # 如果使用ObjectMessage传输对象,必须要加上这个信任包,否则会报ClassNotFound异常
  jms:
    #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
    pub-sub-domain: true

2.3 ActiveMqConfig

 

package com.wondertek.oes.workbench.collect.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

@Configuration
public class ActiveMqConfig {


    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }


    // topic模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

}
 

2.4  MqProducer

 

package com.wondertek.oes.workbench.collect.mq;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import java.io.Serializable;
import java.util.List;

@Service
public class MqProducer {


    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;


    /**
     * 发送字符串消息队列
     *
     * @param queueName 队列名称
     * @param message   字符串
     */
    public void sendStringQueue(String queueName, String message) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
    }


    /**
     * 发送字符串集合消息队列
     *
     * @param queueName 队列名称
     * @param list      字符串集合
     */
    public void sendStringListQueue(String queueName, List<String> list) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);
    }


    /**
     * 发送对象消息队列
     *
     * @param queueName 队列名称
     * @param obj       对象
     */
    public void sendObjQueue(String queueName, Serializable obj) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);
    }


    /**
     * 发送对象集合消息队列
     *
     * @param queueName 队列名称
     * @param objList   对象集合
     */
    public void sendObjListQueue(String queueName, List<Serializable> objList) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);
    }


    /**
     * 发送字符串消息主题
     *
     * @param topicName 主题名称
     * @param message   字符串
     */
    public void sendStringTopic(String topicName, String message) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message);
    }


    /**
     * 发送字符串集合消息主题
     *
     * @param topicName 主题名称
     * @param list      字符串集合
     */
    public void sendStringListTopic(String topicName, List<String> list) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list);
    }


    /**
     * 发送对象消息主题
     *
     * @param topicName 主题名称
     * @param obj       对象
     */
    public void sendObjTopic(String topicName, Serializable obj) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj);
    }


    /**
     * 发送对象集合消息主题
     *
     * @param topicName 主题名称
     * @param objList   对象集合
     */
    public void sendObjListTopic(String topicName, List<Serializable> objList) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList);
    }
    
    //@JmsListener(destination="out.queue")
    @JmsListener(destination = "out.queue", containerFactory = "jmsListenerContainerQueue")
    public void consumerMessage(String text){
        System.out.println("从out.queue队列收到的回复报文为:"+text);
    }

}
 

2.5 QueueConsumer

 

package com.wondertek.oes.workbench.collect.mq;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

@Component
public class QueueConsumer {
    
    @JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveStringQueue(String msg) {
        System.out.println("接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveStringListQueue(List<String> list) {
        System.out.println("接收到集合队列消息...." + list);
    }


    @JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
        System.out.println("接收到对象队列消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
        System.out.println("接收到的对象队列消息..." + objectMessage.getObject());
    }


}
 

2.6 TopicConsumer

 注: A主题消费者,用来接收主题类消息。目前搭了俩个消费者,一个A,一个B。

2.6.1 ATopicConsumer

package com.wondertek.oes.workbench.collect.mq;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

@Component
public class ATopicConsumer {

    @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringTopic(String msg) {
        System.out.println("ATopicConsumer接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringListTopic(List<String> list) {
        System.out.println("ATopicConsumer接收到集合主题消息...." + list);
    }


    @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("ATopicConsumer接收到对象主题消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
    @SendTo("out.queue")
    public String receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("ATopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
        return "ATopicConsumer:"+objectMessage.getObject();
    }

}
 

2.6.2 BTopicConsumer

实现双向队列 

我们在receiveObjListTopic方法上面多加了一个注解@SendTo("out.queue"),该注解的意思是将return回的值,再发送的"out.queue"队列中(生产者有接受方法)

package com.wondertek.oes.workbench.collect.mq;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

@Component
public class BTopicConsumer {

    @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringTopic(String msg) {
        System.out.println("BTopicConsumer接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringListTopic(List<String> list) {
        System.out.println("BTopicConsumer接收到集合主题消息...." + list);
    }


    @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("BTopicConsumer接收到对象主题消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
    @SendTo("out.queue")
    public String receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("BTopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
        return "BTopicConsumer:"+objectMessage.getObject();
    }

}
 

2.7 User类

  注: 用来测试对象的类,复写了toString方法。必须实现Serializable接口

package com.wondertek.oes.workbench.collect.mq;

import java.io.Serializable;

public class User implements Serializable {

    private String id;
    private String name;
    private Integer age;

    public User() {
    }

    public User(String id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}
 

2.8 单元测试类

package com.wondertek.oes.workbench.collect;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.wondertek.oes.workbench.collect.mq.MqProducer;
import com.wondertek.oes.workbench.collect.mq.User;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqdemoApplicationTests {

    @Autowired
    private MqProducer mqProducer;


    @Test
    public void testStringQueue() {

        for (int i = 1; i <= 100; i++) {
            System.out.println("第" + i + "次发送字符串队列消息");
            mqProducer.sendStringQueue("stringQueue", "消息:" + i);
        }
    }


    @Test
    public void testStringListQueue() {

        List<String> idList = new ArrayList<>();
        idList.add("id1");
        idList.add("id2");
        idList.add("id3");

        System.out.println("正在发送集合队列消息ing......");
        mqProducer.sendStringListQueue("stringListQueue", idList);
    }


    @Test
    public void testObjQueue() {

        System.out.println("正在发送对象队列消息......");
        mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20));
    }


    @Test
    public void testObjListQueue() {

        System.out.println("正在发送对象集合队列消息......");

        List<Serializable> userList = new ArrayList<>();
        userList.add(new User("1", "小明", 21));
        userList.add(new User("2", "小雪", 22));
        userList.add(new User("3", "小花", 23));

        mqProducer.sendObjListQueue("objListQueue", userList);
    }


    @Test
    public void testStringTopic() {

        for (int i = 1; i <= 100; i++) {
            System.out.println("第" + i + "次发送字符串主题消息");
            mqProducer.sendStringTopic("stringTopic", "消息:" + i);
        }
    }


    @Test
    public void testStringListTopic() {

        List<String> idList = new ArrayList<>();
        idList.add("id1");
        idList.add("id2");
        idList.add("id3");

        System.out.println("正在发送集合主题消息ing......");
        mqProducer.sendStringListTopic("stringListTopic", idList);
    }


    @Test
    public void testObjTopic() {

        System.out.println("正在发送对象主题消息......");
        mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20));
    }


    @Test
    public void testObjListTopic() {

        System.out.println("正在发送对象集合主题消息......");

        List<Serializable> userList = new ArrayList<>();
        userList.add(new User("1", "小明", 21));
        userList.add(new User("2", "小雪", 22));
        userList.add(new User("3", "小花", 23));

        mqProducer.sendObjListTopic("objListTopic", userList);
    }
}