rabbitmq_在代码中使用
简单点说,分为消息生产者与消费者,两均需要通过地址+端口+账号创建一个通道:channel
再通过队列名及exchange名连接一个队列。
然后就可以发送或者接受消息了。
private static Connection connection;
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(qsf.getHost());
factory.setPort(qsf.getPort());
factory.setUsername(qsf.getUserName());
factory.setPassword(qsf.getPassword());
try {
//创建一个新的连接
connection = factory.newConnection();
} catch (Exception ex) {
ex.printStackTrace();
}
//创建一个通道
Channel channel = connection.createChannel();
// Exchange的名字、Exchange的类型(direct/fanout/topic)、Exchange的持久化属性
channel.exchangeDeclare(mqf.getExchangeName(), mqf.getQueueModel());
// 声明消息队列,且为可持久化的
channel.queueDeclare(mqf.getQueueName(), DURABLE, false, false, null);
//绑定表示转换器与队列之间的关系,当exchange的类型为fanout时,忽略第三个参数
channel.queueBind(mqf.getQueueName(), mqf.getExchangeName(), mqf.getRoutingKey());
发送消息:
//发送消息到队列中
channel.basicPublish(mqf.getExchangeName(), mqf.getQueueName(),
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
接受消息:
QueueingConsumer consumer = new QueueingConsumer(channel);
//取消 autoAck
boolean autoAck = false ;
// 打开消息应答机制
channel.basicConsume(messageQueueConf.getQueueName(), autoAck, consumer);
while (true) {
// 接受消息,没有消息则等待,因此建议放到线程中执行
QueueingConsumer.Delivery delivery = _consumer.nextDelivery();
String message = new String(delivery.getBody());
//处理消息
doWork(message);
//返回接收到消息的确认信息
_channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
将上面的代码封装一下,用于spring管理的项目中,MQ地址写配置文件使用注解注入。
1. 准备工作
1) 创建mq.properties配置文件:
mq.host=47.93.160.120
mq.port=5672
mq.username=zzadmin
mq.password=zzadmin
2) 配置spring扫描目录
<context:component-scan base-package="com.harvetech.mq.conf"></context:component-scan>
<context:component-scan base-package="com.harvetech.mq.zz.conf"></context:component-scan>
<context:component-scan base-package="com.harvetech.mq.net"></context:component-scan>
<context:component-scan base-package="com.harvetech.service"></context:component-scan>
3) 创建mq连接信息类
package com.harvetech.mq.conf;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/*
*
* @ClassName: QueueServerConf
* @Description: mq连接信息
* @author 张磊
* @date 2017年12月8日 下午2:27:09
*
*/
@Service
public class QueueServerConf {
@Value("${mq.host}")
private String host;
@Value("${mq.port}")
private Integer port;
@Value("${mq.username}")
private String userName;
@Value("${mq.password}")
private String password;
public QueueServerConf() {
super();
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
4) 创建mq队列信息类
package com.harvetech.mq.conf;
/*
*
* @ClassName: MessageQueueConf
* @Description: MQ队列信息
* @author 张磊
* @date 2017年12月8日 下午2:26:04
*
*/
public abstract class MQConf {
//队列名称
private String queueName;
//交换机名称
private String exchangeName;
//路由名称
private String routingKey;
//消息类型
private String queueModel;
public MQConf(String queueName, String exchangeName, String routingKey, String queueModel) {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.routingKey = routingKey;
this.queueModel = queueModel;
}
public String getQueueName(){
return queueName;
}
public String getExchangeName(){
return exchangeName;
}
public String getRoutingKey(){
return routingKey;
}
public String getQueueModel(){
return queueModel;
}
}
5) 定制具体mq队列信息
package com.harvetech.mq.zz.conf;
import org.springframework.stereotype.Component;
import com.harvetech.mq.conf.MQConf;
/*
*
* @ClassName: MessageQueueConf
* @Description: 中州项目消息配置
* @author 张磊
* @date 2017年12月8日 下午2:26:04
*
*/
@Component
public class ZZMQConf extends MQConf{
public ZZMQConf() {
super("zzmqtest","zzmqtest.exchange","","fanout");
}
}
6) 创建信息体
package com.harvetech.mq.conf;
/*
*
* @ClassName: MessageInfo
* @Description: 信息体
* @author 张磊
* @date 2017年12月8日 下午2:52:12
*
*/
public class MessageInfo {
private String messageData;// 消息体
private MQConf messageQueueConf;// 消息队列配置
public MessageInfo() {
}
public MessageInfo(String messageData, MQConf messageQueueConf) {
this.messageData = messageData;
this.messageQueueConf = messageQueueConf;
}
public String getMessageData() {
return messageData;
}
public void setMessageData(String messageData) {
this.messageData = messageData;
}
public MQConf getMessageQueueConf() {
return messageQueueConf;
}
public void setMessageQueueConf(MQConf messageQueueConf) {
this.messageQueueConf = messageQueueConf;
}
}
7) 创建队列通道生成器
package com.harvetech.mq.net;
import java.util.HashMap;
import java.util.Map;
import com.harvetech.mq.conf.MQConf;
import com.harvetech.mq.conf.QueueServerConf;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/*
*
* @ClassName: MessageChannelBuilder
* @Description: 队列通道生成器
* @author 张磊
* @date 2017年12月8日 下午2:28:04
*
*/
@Service
public class MessageChannelBuilder {
private static final boolean DURABLE = false;
private static Connection connection;
@Autowired
private QueueServerConf qsf;
private static Map<String, Channel> mapChannel = new HashMap<String, Channel>();
public Channel build(MQConf mqf) {
this.buildConnection();
Channel channel = this.buildChannel(mqf);
return channel;
}
private void buildConnection() {
if (null != connection && connection.isOpen()) {
return;
}
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(qsf.getHost());
factory.setPort(qsf.getPort());
factory.setUsername(qsf.getUserName());
factory.setPassword(qsf.getPassword());
try {
//创建一个新的连接
connection = factory.newConnection();
} catch (Exception ex) {
ex.printStackTrace();
}
}
private Channel buildChannel(MQConf mqf) {
if (mapChannel.containsKey(mqf.getQueueName())) {
Channel channel = mapChannel.get(mqf.getQueueName());
if (channel.isOpen()) {
return channel;
}
}
try {
//创建一个通道
Channel channel = connection.createChannel();
// Exchange的名字、Exchange的类型(direct/fanout/topic)、Exchange的持久化属性
channel.exchangeDeclare(mqf.getExchangeName(), mqf.getQueueModel());
// 声明消息队列,且为可持久化的
channel.queueDeclare(mqf.getQueueName(), DURABLE, false, false, null);
//绑定表示转换器与队列之间的关系,当exchange的类型为fanout时,忽略第三个参数
channel.queueBind(mqf.getQueueName(), mqf.getExchangeName(), mqf.getRoutingKey());
mapChannel.put(mqf.getQueueName(),channel);
return channel;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
}
2. 生产者
1) 创建controller
package com.harvetech.controller;
import javax.annotation.Resource;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import com.harvetech.service.ProducerService;
@Controller
public class ProducerController {
@Resource
private ProducerService indexService;
@RequestMapping("sendmessage")
public String sendmessage(){
System.out.println("--------sendmessage------");
indexService.delivery("hello 张三");
return "sendmessage";
}
}
2) 创建service
package com.harvetech.service;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import com.harvetech.mq.net.MQProducer;
import com.harvetech.mq.zz.conf.ZZMQConf;
/*
*
* @ClassName: ProducerService
* @Description: 发送消息到队列
* @author 张磊
* @date 2017年12月9日 下午2:51:38
*
*/
@Service
public class ProducerService {
@Resource
private ZZMQConf messageQueueConf;
@Resource
private MQProducer producer;
public void delivery(String message) {
producer.send(message);
}
}
3) 创建发送消息类
package com.harvetech.mq.net;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.harvetech.mq.conf.MessageInfo;
import com.harvetech.mq.zz.conf.ZZMQConf;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
/*
*
* @ClassName: MessageEmitor
* @Description: 生产者(发送消息)
* @author 张磊
* @date 2017年12月8日 下午3:20:57
*
*/
@Service
public class MQProducer {
@Autowired
private MessageChannelBuilder builder;
@Resource
private ZZMQConf messageQueueConf;
public void send(String message) {
Channel channel = builder.build(messageQueueConf);
MessageInfo messageInfo = new MessageInfo(message, messageQueueConf);
send(channel,messageInfo);
}
public void send(MessageInfo messageInfo) {
Channel channel = builder.build(messageInfo.getMessageQueueConf());
send(channel,messageInfo);
}
public void send(Channel channel, MessageInfo messageInfo) {
if (channel == null){
return;
}
String message = messageInfo.getMessageData();
try {
//发送消息到队列中
channel.basicPublish(messageInfo.getMessageQueueConf().getExchangeName(), messageInfo.getMessageQueueConf().getQueueName(),
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
4) 测试
访问页面发送消息:
在mq管理页面查看消息:
http://47.93.160.120:15672/#/queues/%2F/zzmqtest
3. 消费者
1) 创建消费者类
派生自线程
package com.harvetech.mq.net;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.harvetech.mq.zz.conf.ZZMQConf;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
/*
*
* @ClassName: MQConsumer
* @Description: 消费者(接受消息)
* @author 张磊
* @date 2017年12月8日 下午5:36:47
*
*/
@Service
public abstract class MQConsumer implements Runnable{
@Autowired
private MessageChannelBuilder builder;
@Resource
private ZZMQConf messageQueueConf;
private Channel _channel;
private QueueingConsumer _consumer;
private Boolean _stop = false;
public abstract void doWork(String message);
public void run() {
try {
System.out.println("------begin listen-------");
if (_consumer == null){
System.out.println("-------_consumer not init -------");
return;
}
while (true) {
if (_stop){
break;
}
// 接受消息,没有消息则等待,因此要放到线程中执行
QueueingConsumer.Delivery delivery = _consumer.nextDelivery();
String message = new String(delivery.getBody());
// 处理消息
doWork(message);
//返回接收到消息的确认信息
_channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
System.out.println("------stop listen-------");
} catch (Exception e) {
e.printStackTrace();
}
}
public Boolean get_stop() {
return _stop;
}
public void set_stop(Boolean _stop) {
this._stop = _stop;
}
public void initConsumer() throws Exception{
_channel = builder.build(messageQueueConf);
_consumer = new QueueingConsumer(_channel);
//取消 autoAck
boolean autoAck = false ;
// 打开消息应答机制
_channel.basicConsume(messageQueueConf.getQueueName(), autoAck, _consumer);
}
}
2) 创建消息业务处理类
package com.harvetech.service;
import org.springframework.stereotype.Service;
import com.harvetech.mq.net.MQConsumer;
/*
*
* @ClassName: ConsumerService
* @Description: 监听MQ后业务处理
* @author 张磊
* @date 2017年12月9日 下午2:50:47
*
*/
@Service
public class ConsumerService extends MQConsumer{
@Override
public void doWork(String message) {
System.out.println("----------" + message);
}
}
3) 添加监听器
在监听器中启动消费者线程。
① 在web.xml中添加监听配置:
<listener>
<listener-class>com.harvetech.listener.ZZMQListener</listener-class>
</listener>
② 在spring中添加bean
因为在监听器中不能通过注入的方法取到消费者实例,因此需要配置为bean。
<bean id="ZZConsumerService" class="com.harvetech.service.ConsumerService"></bean>
③ 创建监听器
package com.harvetech.listener;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import com.harvetech.service.ConsumerService;
/*
*
* @ClassName: ZZMQListener
* @Description: 监听mq消息
* @author 张磊
* @date 2017年12月9日 下午2:48:18
*
*/
public class ZZMQListener implements ServletContextListener {
public void contextInitialized(ServletContextEvent sce) {
try {
WebApplicationContext applicationContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
ConsumerService consumerService = (ConsumerService)applicationContext.getBean("ZZConsumerService");
consumerService.initConsumer();
Thread thread1 = new Thread(consumerService);
thread1.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public void contextDestroyed(ServletContextEvent arg0) {
System.out.println("-------监听结束-------");
}
}
4) 测试
启动项目,自动开启监听器,开始在线程中获取队列消息:
后台输出日志:
队列管理页面查看消息: