springboot整合activemq轻松入门

springboot整合activemq轻松入门

说明:本文参考自互联网

 

activemq配置文件中配置了用户名和密码

activemq.xml完整配置:

<!--

Licensed to the Apache Software Foundation (ASF) under one or more

contributor license agreements. See the NOTICE file distributed with

this work for additional information regarding copyright ownership.

The ASF licenses this file to You under the Apache License, Version 2.0

(the "License"); you may not use this file except in compliance with

the License. You may obtain a copy of the License at

 

http://www.apache.org/licenses/LICENSE-2.0

 

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

-->

<!-- START SNIPPET: example -->

<beans

xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

 

<!-- Allows us to use system properties as variables in this configuration file -->

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

<property name="locations">

<value>file:${activemq.conf}/credentials.properties</value>

</property>

</bean>

 

<!-- Allows accessing the server log -->

<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"

lazy-init="false" scope="singleton"

init-method="start" destroy-method="stop">

</bean>

 

<!--

The <broker> element is used to configure the ActiveMQ broker.

-->

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

 

<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry topic=">" >

<!-- The constantPendingMessageLimitStrategy is used to prevent

slow topic consumers to block producers and affect other consumers

by limiting the number of messages that are retained

For more information, see:

 

http://activemq.apache.org/slow-consumer-handling.html

 

-->

<pendingMessageLimitStrategy>

<constantPendingMessageLimitStrategy limit="1000"/>

</pendingMessageLimitStrategy>

</policyEntry>

</policyEntries>

</policyMap>

</destinationPolicy>

 

 

<!--

The managementContext is used to configure how ActiveMQ is exposed in

JMX. By default, ActiveMQ uses the MBean server that is started by

the JVM. For more information, see:

 

http://activemq.apache.org/jmx.html

-->

<managementContext>

<managementContext createConnector="false"/>

</managementContext>

 

<!--

Configure message persistence for the broker. The default persistence

mechanism is the KahaDB store (identified by the kahaDB tag).

For more information, see:

 

http://activemq.apache.org/persistence.html

-->

<persistenceAdapter>

<kahaDB directory="${activemq.data}/kahadb"/>

</persistenceAdapter>

 

 

<!--

The systemUsage controls the maximum amount of space the broker will

use before disabling caching and/or slowing down producers. For more information, see:

http://activemq.apache.org/producer-flow-control.html

-->

<systemUsage>

<systemUsage>

<memoryUsage>

<memoryUsage percentOfJvmHeap="70" />

</memoryUsage>

<storeUsage>

<storeUsage limit="100 gb"/>

</storeUsage>

<tempUsage>

<tempUsage limit="50 gb"/>

</tempUsage>

</systemUsage>

</systemUsage>

 

<!--

The transport connectors expose ActiveMQ over a given protocol to

clients and other brokers. For more information, see:

 

http://activemq.apache.org/configuring-transports.html

-->

<transportConnectors>

<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

 

<!-- destroy the spring context on shutdown to stop jetty -->

<shutdownHooks>

<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />

</shutdownHooks>

<plugins>

<simpleAuthenticationPlugin>

<users>

<authenticationUser username="root" password="root" groups="users,admins"/>

</users>

</simpleAuthenticationPlugin>

</plugins>

</broker>

<!--

Enable web consoles, REST and Ajax APIs and demos

The web consoles requires by default login, you can disable this in the jetty.xml file

 

Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details

-->

<import resource="jetty.xml"/>

 

</beans>

<!-- END SNIPPET: example -->

 

 

项目结构

springboot整合activemq轻松入门

pom文件:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.1.6.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<groupId>pers.wwz</groupId>

<artifactId>activemq</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>activemq</name>

<description>Demo project for Spring Boot Activemq</description>

 

<properties>

<java.version>1.8</java.version>

</properties>

 

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

 

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-pool</artifactId>

</dependency>

 

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

</dependencies>

 

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

</plugins>

</build>

 

</project>

配置文件:

spring.activemq.broker-url=tcp://192.168.168.161:61616

spring.activemq.in-memory=false

#true表示使用连接池

spring.activemq.pool.enabled=false

#连接池最大连接数

spring.activemq.pool.max-connections=5

#空闲的连接过期时间,默认为30秒

spring.activemq.pool.idle-timeout=30000

#强制的连接过期时间,与idleTimeout的区别在于:idleTimeout是在连接空闲一段时间失效,而expiryTimeout不管当前连接的情况,只要达到指定时间就失效。默认为0,never

#spring.activemq.pool.expiry-timeout=0

#spring.activemq.pool.time-between-expiration-check=0

spring.activemq.password=root

spring.activemq.user=root

 

启动类:

package pers.wwz.activemq;

 

import org.apache.activemq.command.ActiveMQQueue;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.annotation.Bean;

 

@SpringBootApplication

public class ActivemqApplication {

 

@Bean

public ActiveMQQueue queue() {

return new ActiveMQQueue("promoteAct");

}

 

public static void main(String[] args) {

SpringApplication.run(ActivemqApplication.class, args);

}

 

}

 

生产者:

package pers.wwz.activemq.producer;

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsMessagingTemplate;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

 

import javax.jms.Queue;

 

@Component

@EnableScheduling

public class PromoteActProducer {

 

@Autowired

private JmsMessagingTemplate jmsMessagingTemplate;

 

@Autowired

private Queue queue;

 

@Scheduled(fixedDelay = 2000) // 每2s执行1次

public void send() {

this.jmsMessagingTemplate.convertAndSend(this.queue, "hello,activeMQ");

}

}

 

消费者:

package pers.wwz.activemq.consumer;

 

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

 

@Component

public class PromoteActConsumer {

 

@JmsListener(destination = "promoteAct")

public void receiveQueue(String message) {

System.out.println(message+"消息已经消费了");

}

}