springboot+rabbitmq入门应用小示例

最近在搞rabbitmq,各种看视频,找资料,以下是我的一些总结,适用于小白,大神请略过。

首先,rabbitmq是消息队列的一种,比较出名的还有像Kafka 和 ActiveMQ,以及阿里的rocketmq,具体用法和区别,网上一搜一大堆,在此不再啰哩啰嗦的介绍。

直接看代码:

1、搭建项目

博主用的开发工具是idea,下面是基于idea里的Sping initializr搭建springboot项目springboot+rabbitmq入门应用小示例

进入以下页面,注意jdk版本必须1.8以上,然后点击next

springboot+rabbitmq入门应用小示例

填写好项目名,next

springboot+rabbitmq入门应用小示例

按照下图找到rabbitmq,然后选择,最后next,然后finish,ok,搭建完事。

springboot+rabbitmq入门应用小示例

2、配置rabbitmq

在resources文件夹下找到application.properties文件,打开,添加以下配置:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=fan
spring.rabbitmq.password=123

好了,配置也完事,现在可以直接用了,springboot搭建项目就是这么牛X。

3、小示例

rabbitmq有这么几种模式:fanout,direct,topic,别慌,淡定一点,我们一个一个来上代码。

(1)fanout

producer:生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final  String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        //交换机不能存储消息,发送完就没有了
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        String msg = "hello fanout";
        //发送消息
        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
        System.out.println("send" + msg);
        channel.close();
        connection.close();
    }
}

consumer:消费者,消费者可以有多个

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {

    private static final  String QUEUE_NAME="test_queue_fanout_email";
    private static final  String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        channel.basicQos(1);

        //定义消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body,"utf-8");
                System.out.println("[1]---recv:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        //监听队列
//        boolean autoACK = true;//自动应答
        boolean autoACK = false;
        channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer);
    }

}
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {

    private static final  String QUEUE_NAME="test_queue_fanout_sms";
    private static final  String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        channel.basicQos(1);

        //定义消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body,"utf-8");
                System.out.println("[2]---recv:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[2] done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoACK = false;
        channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer);
    }

}

首先启动producer,然后再分别启动两个consumer,看看是不是成功了,如果成功了,ok,你已经会使用fanout模式了,代码测试完毕,借用别人的图来说一下,X代表exchange交换机,他会将消息无差别转发给绑定的queue,并且不做保存,转发完就没有了。

springboot+rabbitmq入门应用小示例

(2)direct

prosucer,只需将上面的代码稍作修改即可

public class Send {
    private static final  String EXCHANGE_NAME="test_exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        
      ConnectionFactory factory = new ConnectionFactory();
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");//分发 //发送消息 String msg = "hello drect"; String routingKey = "info"; channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes()); System.out.println("send" + msg); channel.close(); connection.close(); }}

消费者:

public class Recv2 {
    private static final  String QUEUE_NAME="test_queue_direct_2";
    private static final  String EXCHANGE_NAME="test_exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
        //定义消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body,"utf-8");
                System.out.println("[2]---recv:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[2] done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        //监听队列
//        boolean autoACK = true;//自动应答
        boolean autoACK = false;
        channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer);
    }
}

自己测试上面代码,尝试修改routingKey的值,看看有什么不同?

direct和fanout模式的区别就在于routingKey,也就是路由键,交换机可以根据不同的路由键发送到指定的队列,借别人的图一用:

springboot+rabbitmq入门应用小示例


(3)topic

还是先来producer

public class TopicSend {
    private static final  String EXCHANGE_NAME="test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        String msg = "hello topic";
//        String routingKey = "info";
        channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,msg.getBytes());
        System.out.println("send" + msg);
        channel.close();
        connection.close();
    }
}

再来consumer:

public class TopicRecv1 {
    private static final  String QUEUE_NAME="test_queue_topic_1";
    private static final  String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
        //定义消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body,"utf-8");
                System.out.println("[1]---recv:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        //监听队列
//        boolean autoACK = true;//自动应答
        boolean autoACK = false;
        channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer);
    }
}
public class TopicRecv2 {
    private static final  String QUEUE_NAME="test_queue_topic_2";
    private static final  String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");
        //定义消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body,"utf-8");
                System.out.println("[2]---recv:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[2] done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        //监听队列
//        boolean autoACK = true;//自动应答
        boolean autoACK = false;
        channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer);
    }
}
聪明如你,可以看出来消费者1和2区别就在于goods.add和goods.#,#代表匹配多个
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");
好了,以上就是rabbitmq的简单使用,想了解更多,可以看看这篇文章https://www.jianshu.com/p/79ca08116d57