activemq api基本使用 与 Spring整合JMS(一)

下载安装 (centos)

  1. 下载activeMQ安装包
    https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.8/apache-activemq-5.15.8-bin.tar.gz
  2. 解压安装包
    tar xzf apache-activemq-5.15.8-bin.tar.gz
    activemq api基本使用 与 Spring整合JMS(一)
  3. 进入bin目录启动
    sh activemq start
    activemq api基本使用 与 Spring整合JMS(一)

测试
http://192.168.1.103:8161/admin/
用户名:admin
密码:admin
activemq api基本使用 与 Spring整合JMS(一)

点对点(一对一)

  1. 每个消息只能有一个消费者
  2. 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,都可以提取消息

生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsSender {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.103:61616");
		System.out.println(connectionFactory);
		Connection connection = null;
		try {
			// 创建连接
			connection = connectionFactory.createConnection();
			connection.start();
			
			// 创建会话
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			
			// 创建队列,第一次才创建,first-queue表示名称,destination表示目的地
			Destination destination = session.createQueue("first-queue");
			MessageProducer producer = session.createProducer(destination);
			
			TextMessage textMessage = session.createTextMessage();
			textMessage.setText("这是我的消息,你接收到了吗??");
			
			textMessage.setStringProperty("name", "张三");
			producer.send(textMessage);
			
			session.commit();
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
}

消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsReceiver {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.103:61616");
		System.out.println(connectionFactory);
		Connection connection = null;
		try {
			// 创建连接
			connection = connectionFactory.createConnection();
			connection.start();
			
			// 创建会话
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			
			// 创建队列,第一次才创建,first-queue表示名称,destination表示目的地
			Destination destination = session.createQueue("first-queue");
			MessageConsumer consumer = session.createConsumer(destination);
			
			TextMessage message = (TextMessage) consumer.receive();
			String text = message.getText();
			String name = message.getStringProperty("name");
			
			System.out.println(text);
			System.out.println(name);
			
			session.commit();
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
}

测试

  • 运行生产者,通过页面管理可以看到有一条消息,未被处理
    activemq api基本使用 与 Spring整合JMS(一)

  • 运行消费者,消费者拿到生产者数据
    activemq api基本使用 与 Spring整合JMS(一)
    通过管理员页面查看,生产的消息已经被消费
    activemq api基本使用 与 Spring整合JMS(一)

发布订阅(pub/sub)(一对多)

  1. 每个消息可以有多个消费者
  2. 消息的生产者和消费者之间存在时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息。
  3. JMS规范允许提供客户端创建持久订阅JMS规范允许提供客户端创建持久订阅

生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsTopicSender {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.103:61616");
		System.out.println(connectionFactory);
		Connection connection = null;
		try {
			// 创建连接
			connection = connectionFactory.createConnection();
			connection.start();
			
			// 创建会话
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			
			// 创建队列,第一次才创建,first-topic表示名称,destination表示目的地
			Destination destination = session.createTopic("first-topic");
			MessageProducer producer = session.createProducer(destination);
			
			TextMessage textMessage = session.createTextMessage();
			textMessage.setText("只是topic,大家收到了吗");
			
			textMessage.setStringProperty("name", "主播A");
			producer.send(textMessage);
			
			session.commit();
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
}

消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsTopicReceiver {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.103:61616");
		System.out.println(connectionFactory);
		Connection connection = null;
		try {
			// 创建连接
			connection = connectionFactory.createConnection();
			connection.start();
			
			// 创建会话
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			
			// 创建队列,第一次才创建,first-queue表示名称,destination表示目的地
			Destination destination = session.createTopic("first-topic");
			MessageConsumer consumer = session.createConsumer(destination);
			
			TextMessage message = (TextMessage) consumer.receive();
			String text = message.getText();
			String name = message.getStringProperty("name");
			
			System.out.println(text);
			System.out.println(name);
			
			session.commit();
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
}

测试

  • 先启动两个消费者,先订阅topic

activemq api基本使用 与 Spring整合JMS(一)

管理员页面

activemq api基本使用 与 Spring整合JMS(一)

  • 生产者发动消息后,2消费者均收到消息

activemq api基本使用 与 Spring整合JMS(一)

管理员页面
activemq api基本使用 与 Spring整合JMS(一)

事务

事务性会话

第一个入参置为true时,第二个参数则不起作用,将进行类似于数据库事务的操作,需要手动执行commi / rollback 提交,

Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

当消费者不执行commit操作,则一直消费,因为没有收到事务提交

// session.commit();

非事务性会话

第一个入参置为false时,消息何时被确认取决于创建会话时的应答模式

Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  • AUTO_ACKNOWLEDGE
    当客户端成功从recive方法返回以后,或者[MessageListener.onMessage] 方法成功返回以后,会话会自动确认该消息
			// 创建会话
			Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			// session.commit();
  • CLIENT_ACKNOWLEDGE
    客户端通过调用消息的textMessage.acknowledge();确认消息。
    在这种模式中,如果一个消息消费者消费一共是10个消息,那么消费了5个消息,然后在第5个消息通过textMessage.acknowledge(),那么之前的所有消息都会被消确认
			// 创建会话
			Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
			message.acknowledge();

message.acknowledge();可做批量确认

  • DUPS_OK_ACKNOWLEDGE
    延迟确认

Spring整合JMS

pom文件

		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.15.8</version>
		</dependency>

		<!-- activemq 整合 spring -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>5.0.9.RELEASE</version>
		</dependency>

		<!-- 依赖包 列表 -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-pool2</artifactId>
			<version>2.0</version>
		</dependency>

生产者

service-jms-sender.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://176.16.0.135:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="50"/>
    </bean>
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-queue"/>
    </bean>

    <!--<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="destination"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>

对应java代码

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class SpringJmsSender {
	public static void main(String[] args) {
        ClassPathXmlApplicationContext context=
                new ClassPathXmlApplicationContext(
                        "classpath:service-jms-sender.xml");

        JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");

        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage message=session.createTextMessage();
                message.setText("Hello,张三");
                return message;
            }
        });
    }
}

消费端

配置文件service-jms-reveiver.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">

     <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
         <property name="connectionFactory">
             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                 <property name="brokerURL">
                     <value>tcp://176.16.0.135:61616</value>
                 </property>
             </bean>
         </property>
         <property name="maxConnections" value="50"/>
     </bean>
     <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
         <constructor-arg index="0" value="spring-queue"/>
     </bean>

  <!--   <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
         <constructor-arg index="0" value="spring-topic"/>
     </bean>-->

     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
         <property name="connectionFactory" ref="connectionFactory"/>
         <property name="defaultDestination" ref="destination"/>
         <property name="messageConverter">
             <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
         </property>
     </bean>

    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="destination"/>
        <property name="messageListener" ref="messageListener"/>
    </bean>

    <bean id="messageListener" class="priv.dengjl.activemq.springjms.SpringJmsListener"/>
</beans>

阻塞模式

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

public class SpringJmsReceiver {
	public static void main(String[] args) {
        ClassPathXmlApplicationContext context=
                new ClassPathXmlApplicationContext(
                        "classpath:service-jms-reveiver.xml");
        // 阻塞方式
        JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");

        String msg=(String)jmsTemplate.receiveAndConvert();

        System.out.println(msg);
    }
}

非阻塞模式
引入listener

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class SpringJmsListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println(((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

启动方式

import java.io.IOException;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;

public class SpringJmsReceiver2Listener {
	public static void main(String[] args) {
        ClassPathXmlApplicationContext context=
                new ClassPathXmlApplicationContext(
                        "classpath:service-jms-reveiver.xml");
        // 非阻塞,监听方式
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}