activemq api基本使用 与 Spring整合JMS(一)
下载安装 (centos)
- 下载activeMQ安装包
https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.8/apache-activemq-5.15.8-bin.tar.gz - 解压安装包
tar xzf apache-activemq-5.15.8-bin.tar.gz
- 进入bin目录启动
sh activemq start
测试
http://192.168.1.103:8161/admin/
用户名:admin
密码:admin
点对点(一对一)
- 每个消息只能有一个消费者
- 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,都可以提取消息
生产者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsSender {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.103:61616");
System.out.println(connectionFactory);
Connection connection = null;
try {
// 创建连接
connection = connectionFactory.createConnection();
connection.start();
// 创建会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建队列,第一次才创建,first-queue表示名称,destination表示目的地
Destination destination = session.createQueue("first-queue");
MessageProducer producer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage();
textMessage.setText("这是我的消息,你接收到了吗??");
textMessage.setStringProperty("name", "张三");
producer.send(textMessage);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
消费者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.103:61616");
System.out.println(connectionFactory);
Connection connection = null;
try {
// 创建连接
connection = connectionFactory.createConnection();
connection.start();
// 创建会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建队列,第一次才创建,first-queue表示名称,destination表示目的地
Destination destination = session.createQueue("first-queue");
MessageConsumer consumer = session.createConsumer(destination);
TextMessage message = (TextMessage) consumer.receive();
String text = message.getText();
String name = message.getStringProperty("name");
System.out.println(text);
System.out.println(name);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
测试
-
运行生产者,通过页面管理可以看到有一条消息,未被处理
-
运行消费者,消费者拿到生产者数据
通过管理员页面查看,生产的消息已经被消费
发布订阅(pub/sub)(一对多)
- 每个消息可以有多个消费者
- 消息的生产者和消费者之间存在时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息。
- JMS规范允许提供客户端创建持久订阅JMS规范允许提供客户端创建持久订阅
生产者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsTopicSender {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.103:61616");
System.out.println(connectionFactory);
Connection connection = null;
try {
// 创建连接
connection = connectionFactory.createConnection();
connection.start();
// 创建会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建队列,第一次才创建,first-topic表示名称,destination表示目的地
Destination destination = session.createTopic("first-topic");
MessageProducer producer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage();
textMessage.setText("只是topic,大家收到了吗");
textMessage.setStringProperty("name", "主播A");
producer.send(textMessage);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
消费者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsTopicReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.103:61616");
System.out.println(connectionFactory);
Connection connection = null;
try {
// 创建连接
connection = connectionFactory.createConnection();
connection.start();
// 创建会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建队列,第一次才创建,first-queue表示名称,destination表示目的地
Destination destination = session.createTopic("first-topic");
MessageConsumer consumer = session.createConsumer(destination);
TextMessage message = (TextMessage) consumer.receive();
String text = message.getText();
String name = message.getStringProperty("name");
System.out.println(text);
System.out.println(name);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
测试
- 先启动两个消费者,先订阅topic
管理员页面
- 生产者发动消息后,2消费者均收到消息
管理员页面
事务
事务性会话
当第一个入参置为true
时,第二个参数则不起作用,将进行类似于数据库事务的操作,需要手动执行commi / rollback
提交,
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
当消费者不执行commit操作,则一直消费,因为没有收到事务提交
// session.commit();
非事务性会话
当第一个入参置为false
时,消息何时被确认取决于创建会话时的应答模式
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
- AUTO_ACKNOWLEDGE
当客户端成功从recive方法返回以后,或者[MessageListener.onMessage] 方法成功返回以后,会话会自动确认该消息
// 创建会话
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// session.commit();
- CLIENT_ACKNOWLEDGE
客户端通过调用消息的textMessage.acknowledge();确认消息。
在这种模式中,如果一个消息消费者消费一共是10个消息,那么消费了5个消息,然后在第5个消息通过textMessage.acknowledge(),那么之前的所有消息都会被消确认
// 创建会话
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
message.acknowledge();
message.acknowledge();可做批量确认
- DUPS_OK_ACKNOWLEDGE
延迟确认
Spring整合JMS
pom文件
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.8</version>
</dependency>
<!-- activemq 整合 spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.9.RELEASE</version>
</dependency>
<!-- 依赖包 列表 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.0</version>
</dependency>
生产者
service-jms-sender.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://176.16.0.135:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="50"/>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue"/>
</bean>
<!--<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="destination"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
对应java代码
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class SpringJmsSender {
public static void main(String[] args) {
ClassPathXmlApplicationContext context=
new ClassPathXmlApplicationContext(
"classpath:service-jms-sender.xml");
JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message=session.createTextMessage();
message.setText("Hello,张三");
return message;
}
});
}
}
消费端
配置文件service-jms-reveiver.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://176.16.0.135:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="50"/>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue"/>
</bean>
<!-- <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="destination"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
<bean id="messageListener" class="priv.dengjl.activemq.springjms.SpringJmsListener"/>
</beans>
阻塞模式
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
public class SpringJmsReceiver {
public static void main(String[] args) {
ClassPathXmlApplicationContext context=
new ClassPathXmlApplicationContext(
"classpath:service-jms-reveiver.xml");
// 阻塞方式
JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
String msg=(String)jmsTemplate.receiveAndConvert();
System.out.println(msg);
}
}
非阻塞模式
引入listener
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class SpringJmsListener implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
启动方式
import java.io.IOException;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
public class SpringJmsReceiver2Listener {
public static void main(String[] args) {
ClassPathXmlApplicationContext context=
new ClassPathXmlApplicationContext(
"classpath:service-jms-reveiver.xml");
// 非阻塞,监听方式
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}