java消息中间件
java消息中间件学习
源码下载地址1:https://pan.baidu.com/s/1qYmN4ss
1,为什么使用消息中间件?
例子:老王给小孩将故事、微信公众号。
2, 消息中间件好处?
系统的解耦。(登录系统和日志系统,解耦)
异步(所有用户登录都要通知积分系统)
横向扩展
安全可靠
顺序保证
例子:卡夫卡消息中间件,activeMq消息中间件,消息中间件,消息的消费。(术语和一些消息中间件)
很多时候,消息中间件可以保存消息,直到应用消费为止,很多时候由于目标应用在使用此消息的时候出现系统问题,消息中间件可以在目标系统恢复之后,再将消息发送进行消费。(消息中间件会将消息保存,直到我们的业务系统将消息消费为止。)
ActiveMq 使用实例 :
(生产者)
package com.hxj.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MsgProducer { private static final String url = "tcp://192.168.151.192:61616"; private static final String msgName = "firstMag"; public static void main(String[] args) throws Exception { //1、类似jdbc 首先创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2、创建连接 Connection connection = connectionFactory.createConnection(); //3、启动连接 connection.start(); //4、创建会话 参数 是否在事物处理,连接应答模式 自动应答 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5、创建一个目标 Destination destination = session.createQueue(msgName); //6、创建生产者 MessageProducer messageProducer = session.createProducer(destination); //7、循环发送消息 for (int i = 0; i < 100; i++) { //8、创建消息 TextMessage textMessage = session.createTextMessage("test"+i); messageProducer.send(textMessage); System.out.println("消息发送成功"+textMessage.getText()); } connection.close(); } }
运行代码后 在activeMq 管理客户端
Spring 集成 JMS
(消费者)
package com.hxj.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MsgComsumer { private static final String url = "tcp://192.168.151.192:61616"; private static final String msgName = "firstMag"; public static void main(String[] args) throws Exception { //1、类似jdbc 首先创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2、创建连接 Connection connection = connectionFactory.createConnection(); //3、启动连接 connection.start(); //4、创建会话 参数 是否在事物处理,连接应答模式 自动应答 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5、创建一个目标 Destination destination = session.createQueue(msgName); //6、创建消费者 MessageConsumer messageConsumer = session.createConsumer(destination); //7、创建监听器 messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (Exception e){ e.getStackTrace(); } } }); //connection.close(); } }
(主题模式)Topic
只需要将创建 目标的代码改成:
session.createTopic(msgName);
Spring JMS
源码下载(点击下载)
生产者
pom配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hxj.jms</groupId> <artifactId>jms-spring</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spring.version>4.2.5.RELEASE</spring.version> <activeMq.version>5.7.0</activeMq.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activeMq.version}</version> <exclusions> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> </dependencies> </project>
代码
接口类
package com.hxj.jms.producer; public interface ProducerService { void sendMessage(String message); }
实现类
package com.hxj.jms.producer.impl; import com.hxj.jms.producer.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.annotation.Resource; import javax.jms.*; public class ProducerServiceImpl implements ProducerService { @Autowired private JmsTemplate jmsTemplate; @Resource(name="queueDestination") private Destination destination; public void sendMessage(final String message){ jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); return textMessage; } }); System.out.println("发送消息:"+message); } }
调用类
package com.hxj.jms.producer; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class AppProducer { public static void main(String[] args) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("producer.xml"); ProducerService producerService = applicationContext.getBean(ProducerService.class); for(int i=0;i<200;i++){ producerService.sendMessage("test:"+i); } } }
消费者