maven搭建JMS消息队列(ActiveMQ)
步骤一:到ActiveMQ官网下载apache-activemq-5.9.1-bin.zip,启动activemq.bat,
打开http://localhost:8161/监控消息队列面板
步骤二:
创建maven项目
pom依赖
<!-- activemq 相关maven依赖 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.7.0</version> </dependency> <!-- 日志相关依赖 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency>
消息发送者Sender:
public class Sender { //默认代理地址 "failover://tcp://localhost:61616" 服务器地址不同IP修改不同的IP private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL; //消息队列名称 private static final String SUBJECT="my-activemq"; private static int i=1; public static void main(String[] args) throws JMSException, InterruptedException { //初始化连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL); //建立连接 Connection conn= connectionFactory.createConnection(); //启动连接 conn.start(); //创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式 Session session= conn.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建目标队列 Destination dest = session.createQueue(SUBJECT); //通过session创建消息的发送者 MessageProducer producer=session.createProducer(dest); while(true){ //定义要发送的消息 TextMessage message= session.createTextMessage("======ActiveMQ发送消息===="+i+"==="); //发送消息 producer.send(message); //休眠2秒 Thread.sleep(2000); i++; } // conn.close(); } }
消息接收者Receiver:
public class Receiver implements MessageListener{ //默认代理地址 "failover://tcp://localhost:61616" 服务器地址不同IP修改不同的IP private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL; //消息队列名称 private static final String SUBJECT="my-activemq"; public static void main(String[] args) throws JMSException { //初始化连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL); //建立连接 Connection conn= connectionFactory.createConnection(); //启动连接 conn.start(); //创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式 Session session= conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目标队列 Destination dest=session.createQueue(SUBJECT); //通过session创建消息的接收者 MessageConsumer consumer= session.createConsumer(dest); //初始化监听 Receiver receiver=new Receiver(); //给接收者添加监听对象 consumer.setMessageListener(receiver); } public void onMessage(Message arg0) { TextMessage message=(TextMessage) arg0; try { System.out.println(message.getText()); Thread.sleep(4000); } catch (Exception e) { } } }
启动Sender,再启动Receiver,查看控制台和ActiveMQ控制台