rabbitmq_在代码中使用

简单点说,分为消息生产者与消费者,两均需要通过地址+端口+账号创建一个通道:channel

再通过队列名及exchange名连接一个队列。

然后就可以发送或者接受消息了。

private static Connection connection;

//创建连接工厂

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(qsf.getHost());

factory.setPort(qsf.getPort());

factory.setUsername(qsf.getUserName());

factory.setPassword(qsf.getPassword());

try {

//创建一个新的连接

    connection = factory.newConnection();

} catch (Exception ex) {

    ex.printStackTrace();

}

 

//创建一个通道

Channel channel = connection.createChannel();

// Exchange的名字、Exchange的类型(direct/fanout/topic)、Exchange的持久化属性

channel.exchangeDeclare(mqf.getExchangeName(), mqf.getQueueModel());

// 声明消息队列,且为可持久化的

channel.queueDeclare(mqf.getQueueName(), DURABLE, false, false, null);

//绑定表示转换器与队列之间的关系,exchange的类型为fanout时,忽略第三个参数

channel.queueBind(mqf.getQueueName(), mqf.getExchangeName(), mqf.getRoutingKey());

 

发送消息:

//发送消息到队列中 

channel.basicPublish(mqf.getExchangeName(), mqf.getQueueName(),

MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

 

接受消息:

QueueingConsumer consumer = new QueueingConsumer(channel);  

//取消 autoAck    

boolean autoAck = false ;   

// 打开消息应答机制  

channel.basicConsume(messageQueueConf.getQueueName(), autoAck, consumer);

while (true) {

// 接受消息,没有消息则等待,因此建议放到线程中执行

QueueingConsumer.Delivery delivery = _consumer.nextDelivery();  

String message = new String(delivery.getBody());  

//处理消息  

doWork(message);  

//返回接收到消息的确认信息  

_channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

 

将上面的代码封装一下,用于spring管理的项目中,MQ地址写配置文件使用注解注入。

1. 准备工作

1) 创建mq.properties配置文件:

mq.host=47.93.160.120

mq.port=5672

mq.username=zzadmin

mq.password=zzadmin

 

2) 配置spring扫描目录

<context:component-scan base-package="com.harvetech.mq.conf"></context:component-scan>

<context:component-scan base-package="com.harvetech.mq.zz.conf"></context:component-scan>

<context:component-scan base-package="com.harvetech.mq.net"></context:component-scan>

<context:component-scan base-package="com.harvetech.service"></context:component-scan>

 

3) 创建mq连接信息类

package com.harvetech.mq.conf;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Service;

/*

 *

* @ClassName: QueueServerConf

* @Description: mq连接信息

* @author 张磊

* @date 2017128日 下午2:27:09

*

 */

@Service

public class QueueServerConf {

    @Value("${mq.host}")

    private String host;

    @Value("${mq.port}")

    private Integer port;

    @Value("${mq.username}")

    private String userName;

    @Value("${mq.password}")

private String password;

 

    public QueueServerConf() {

super();

}

 

public String getHost() {

        return host;

    }

 

    public void setHost(String host) {

        this.host = host;

    }

 

    public Integer getPort() {

        return port;

    }

 

    public void setPort(Integer port) {

        this.port = port;

    }

 

    public String getUserName() {

        return userName;

    }

 

    public void setUserName(String userName) {

        this.userName = userName;

    }

 

    public String getPassword() {

        return password;

    }

 

    public void setPassword(String password) {

        this.password = password;

    }

}

 

4) 创建mq队列信息类

package com.harvetech.mq.conf;

/*

 *

* @ClassName: MessageQueueConf

* @Description: MQ队列信息

* @author 张磊

* @date 2017128日 下午2:26:04

*

 */

public abstract class MQConf {

    //队列名称

    private String queueName;

    //交换机名称

    private String exchangeName;

    //路由名称

    private String routingKey;

    //消息类型

    private String queueModel;

    

    public MQConf(String queueName, String exchangeName, String routingKey, String queueModel) {

this.queueName = queueName;

this.exchangeName = exchangeName;

this.routingKey = routingKey;

this.queueModel = queueModel;

}

 

public String getQueueName(){

    return queueName;

    }

 

    public String getExchangeName(){

    return exchangeName;

    }

 

    public String getRoutingKey(){

    return routingKey;

    }

 

    public String getQueueModel(){

    return queueModel;

    }

}

 

5) 定制具体mq队列信息

package com.harvetech.mq.zz.conf;

import org.springframework.stereotype.Component;

import com.harvetech.mq.conf.MQConf;

/*

 *

* @ClassName: MessageQueueConf

* @Description: 中州项目消息配置

* @author 张磊

* @date 2017128日 下午2:26:04

*

 */

@Component

public class ZZMQConf extends MQConf{

public ZZMQConf() {

super("zzmqtest","zzmqtest.exchange","","fanout");

}

}

 

6) 创建信息体

package com.harvetech.mq.conf;

/*

 *

* @ClassName: MessageInfo

* @Description: 信息体

* @author 张磊

* @date 2017128日 下午2:52:12

*

 */

public class MessageInfo {

    private String messageData;// 消息体

    private MQConf messageQueueConf;// 消息队列配置

    

public MessageInfo() {

}

public MessageInfo(String messageData, MQConf messageQueueConf) {

this.messageData = messageData;

this.messageQueueConf = messageQueueConf;

}

public String getMessageData() {

return messageData;

}

public void setMessageData(String messageData) {

this.messageData = messageData;

}

public MQConf getMessageQueueConf() {

return messageQueueConf;

}

public void setMessageQueueConf(MQConf messageQueueConf) {

this.messageQueueConf = messageQueueConf;

}

}

 

7) 创建队列通道生成器

package com.harvetech.mq.net;

import java.util.HashMap;

import java.util.Map;

import com.harvetech.mq.conf.MQConf;

import com.harvetech.mq.conf.QueueServerConf;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

/*

 *

* @ClassName: MessageChannelBuilder

* @Description: 队列通道生成器

* @author 张磊

* @date 2017128日 下午2:28:04

*

 */

@Service

public class MessageChannelBuilder {

 

    private static final boolean DURABLE = false;

 

    private static Connection connection;

 

    @Autowired

    private QueueServerConf qsf;

 

    private static Map<String, Channel> mapChannel = new HashMap<String, Channel>();

 

    public Channel build(MQConf mqf) {

        this.buildConnection();

        Channel channel = this.buildChannel(mqf);

        return channel;

    }

 

    private void buildConnection() {

        if (null != connection && connection.isOpen()) {

            return;

        }

        //创建连接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost(qsf.getHost());

        factory.setPort(qsf.getPort());

        factory.setUsername(qsf.getUserName());

        factory.setPassword(qsf.getPassword());

        try {

        //创建一个新的连接

            connection = factory.newConnection();

        } catch (Exception ex) {

            ex.printStackTrace();

        }

        

    }

 

    private Channel buildChannel(MQConf mqf) {

        if (mapChannel.containsKey(mqf.getQueueName())) {

            Channel channel = mapChannel.get(mqf.getQueueName());

            if (channel.isOpen()) {

                return channel;

            }

        }

        try {

        //创建一个通道

            Channel channel = connection.createChannel();

            // Exchange的名字、Exchange的类型(direct/fanout/topic)、Exchange的持久化属性

            channel.exchangeDeclare(mqf.getExchangeName(), mqf.getQueueModel());

            // 声明消息队列,且为可持久化的

            channel.queueDeclare(mqf.getQueueName(), DURABLE, false, false, null);

            //绑定表示转换器与队列之间的关系,exchange的类型为fanout时,忽略第三个参数

            channel.queueBind(mqf.getQueueName(), mqf.getExchangeName(), mqf.getRoutingKey());

            mapChannel.put(mqf.getQueueName(),channel);

            return channel;

        } catch (Exception ex) {

        ex.printStackTrace();

            return null;

        }

    }

}

 

2. 生产者

1) 创建controller

package com.harvetech.controller;

import javax.annotation.Resource;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import com.harvetech.service.ProducerService;

@Controller

public class ProducerController {

@Resource

private ProducerService indexService;

@RequestMapping("sendmessage")

public String sendmessage(){

System.out.println("--------sendmessage------");

indexService.delivery("hello 张三");

return "sendmessage";

}

}

2) 创建service

package com.harvetech.service;

import javax.annotation.Resource;

import org.springframework.stereotype.Service;

import com.harvetech.mq.net.MQProducer;

import com.harvetech.mq.zz.conf.ZZMQConf;

/*

 *

* @ClassName: ProducerService

* @Description: 发送消息到队列

* @author 张磊

* @date 2017129日 下午2:51:38

*

 */

@Service

public class ProducerService {

 

    @Resource

    private ZZMQConf messageQueueConf;

    

    @Resource 

    private MQProducer producer;

    

    public void delivery(String message) {

    producer.send(message);

    }

}

3) 创建发送消息类

package com.harvetech.mq.net;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import com.harvetech.mq.conf.MessageInfo;

import com.harvetech.mq.zz.conf.ZZMQConf;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.MessageProperties;

/*

 *

* @ClassName: MessageEmitor

* @Description: 生产者(发送消息)

* @author 张磊

* @date 2017128日 下午3:20:57

*

 */

@Service

public class MQProducer {

    @Autowired

    private MessageChannelBuilder builder;

 

    @Resource

    private ZZMQConf messageQueueConf;

    

    public void send(String message) {

        Channel channel = builder.build(messageQueueConf);

        MessageInfo messageInfo = new MessageInfo(message, messageQueueConf);

        send(channel,messageInfo);

    }

    

    public void send(MessageInfo messageInfo) {

        Channel channel = builder.build(messageInfo.getMessageQueueConf());

        send(channel,messageInfo);

    }

    

    public void send(Channel channel, MessageInfo messageInfo) {

    if (channel == null){

    return;

    }

    

        String message = messageInfo.getMessageData();

        try {

        //发送消息到队列中

            channel.basicPublish(messageInfo.getMessageQueueConf().getExchangeName(), messageInfo.getMessageQueueConf().getQueueName(),

                    MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

4) 测试

访问页面发送消息:

 rabbitmq_在代码中使用

mq管理页面查看消息:

http://47.93.160.120:15672/#/queues/%2F/zzmqtest

 rabbitmq_在代码中使用

  

3. 消费者

1) 创建消费者类

派生自线程

package com.harvetech.mq.net;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import com.harvetech.mq.zz.conf.ZZMQConf;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.QueueingConsumer;

/*

 *

* @ClassName: MQConsumer

* @Description: 消费者(接受消息)

* @author 张磊

* @date 2017128日 下午5:36:47

*

 */

@Service

public abstract class MQConsumer implements Runnable{

    @Autowired

    private MessageChannelBuilder builder;

    @Resource

    private ZZMQConf messageQueueConf;

    

    private Channel _channel;

    private QueueingConsumer _consumer;

    private Boolean _stop = false;

    

    public abstract void doWork(String message);

    

public void run() {

try {

System.out.println("------begin listen-------");

if (_consumer == null){

System.out.println("-------_consumer not init -------");

return;

}

        while (true) {  

        if (_stop){

        break;

        }

        // 接受消息,没有消息则等待,因此要放到线程中执行

            QueueingConsumer.Delivery delivery = _consumer.nextDelivery();  

            String message = new String(delivery.getBody());  

            // 处理消息

            doWork(message);  

            //返回接收到消息的确认信息  

            _channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }

        System.out.println("------stop listen-------");

        

} catch (Exception e) {

e.printStackTrace();

}  

}

    public Boolean get_stop() {

return _stop;

}

 

public void set_stop(Boolean _stop) {

this._stop = _stop;

}

 

public void initConsumer() throws Exception{

    _channel = builder.build(messageQueueConf);

        _consumer = new QueueingConsumer(_channel);  

        //取消 autoAck    

        boolean autoAck = false ;   

        // 打开消息应答机制  

        _channel.basicConsume(messageQueueConf.getQueueName(), autoAck, _consumer);  

    }

}

 

2) 创建消息业务处理类

package com.harvetech.service;

import org.springframework.stereotype.Service;

import com.harvetech.mq.net.MQConsumer;

/*

 *

* @ClassName: ConsumerService

* @Description: 监听MQ后业务处理

* @author 张磊

* @date 2017129日 下午2:50:47

*

 */

@Service

public class ConsumerService extends MQConsumer{

@Override

public void doWork(String message) {

System.out.println("----------" + message);

}

}

 

3) 添加监听器

在监听器中启动消费者线程。

① web.xml中添加监听配置:

  <listener>  

  <listener-class>com.harvetech.listener.ZZMQListener</listener-class>  

</listener>  

 

② spring中添加bean

因为在监听器中不能通过注入的方法取到消费者实例,因此需要配置为bean

<bean id="ZZConsumerService" class="com.harvetech.service.ConsumerService"></bean>

 

③ 创建监听器

package com.harvetech.listener;

import javax.servlet.ServletContextEvent;

import javax.servlet.ServletContextListener;

import org.springframework.web.context.WebApplicationContext;

import org.springframework.web.context.support.WebApplicationContextUtils;

import com.harvetech.service.ConsumerService;

/*

 *

* @ClassName: ZZMQListener

* @Description: 监听mq消息

* @author 张磊

* @date 2017129日 下午2:48:18

*

 */

public class ZZMQListener implements ServletContextListener {  

  

    public void contextInitialized(ServletContextEvent sce) {  

    try {

    WebApplicationContext applicationContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());      

    ConsumerService consumerService = (ConsumerService)applicationContext.getBean("ZZConsumerService");  

consumerService.initConsumer();

        Thread thread1 = new Thread(consumerService);

        thread1.start();

} catch (Exception e) {

e.printStackTrace();

}

    }  

      

    public void contextDestroyed(ServletContextEvent arg0) {  

    System.out.println("-------监听结束-------");

    }  

}  

 

4) 测试

启动项目,自动开启监听器,开始在线程中获取队列消息:

后台输出日志:

 rabbitmq_在代码中使用

队列管理页面查看消息:

rabbitmq_在代码中使用