springboot+rabbitmq入门应用小示例
最近在搞rabbitmq,各种看视频,找资料,以下是我的一些总结,适用于小白,大神请略过。
首先,rabbitmq是消息队列的一种,比较出名的还有像Kafka 和 ActiveMQ,以及阿里的rocketmq,具体用法和区别,网上一搜一大堆,在此不再啰哩啰嗦的介绍。
直接看代码:
1、搭建项目
博主用的开发工具是idea,下面是基于idea里的Sping initializr搭建springboot项目
进入以下页面,注意jdk版本必须1.8以上,然后点击next
填写好项目名,next
按照下图找到rabbitmq,然后选择,最后next,然后finish,ok,搭建完事。
2、配置rabbitmq
在resources文件夹下找到application.properties文件,打开,添加以下配置:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=fan spring.rabbitmq.password=123
好了,配置也完事,现在可以直接用了,springboot搭建项目就是这么牛X。
3、小示例
rabbitmq有这么几种模式:fanout,direct,topic,别慌,淡定一点,我们一个一个来上代码。
(1)fanout
producer:生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明交换机 //交换机不能存储消息,发送完就没有了 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String msg = "hello fanout"; //发送消息 channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); System.out.println("send" + msg); channel.close(); connection.close(); } }
consumer:消费者,消费者可以有多个
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME="test_queue_fanout_email"; private static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1); //定义消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body,"utf-8"); System.out.println("[1]---recv:" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //监听队列 // boolean autoACK = true;//自动应答 boolean autoACK = false; channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME="test_queue_fanout_sms"; private static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1); //定义消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body,"utf-8"); System.out.println("[2]---recv:" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoACK = false; channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer); } }
首先启动producer,然后再分别启动两个consumer,看看是不是成功了,如果成功了,ok,你已经会使用fanout模式了,代码测试完毕,借用别人的图来说一下,X代表exchange交换机,他会将消息无差别转发给绑定的queue,并且不做保存,转发完就没有了。
(2)direct
prosucer,只需将上面的代码稍作修改即可
public class Send { private static final String EXCHANGE_NAME="test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection();
Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,"direct");//分发 //发送消息 String msg = "hello drect"; String routingKey = "info"; channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes()); System.out.println("send" + msg); channel.close(); connection.close(); }}
消费者:
public class Recv2 { private static final String QUEUE_NAME="test_queue_direct_2"; private static final String EXCHANGE_NAME="test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //获取通道 final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning"); //定义消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body,"utf-8"); System.out.println("[2]---recv:" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //监听队列 // boolean autoACK = true;//自动应答 boolean autoACK = false; channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer); } }
自己测试上面代码,尝试修改routingKey的值,看看有什么不同?
direct和fanout模式的区别就在于routingKey,也就是路由键,交换机可以根据不同的路由键发送到指定的队列,借别人的图一用:
(3)topic
还是先来producer
public class TopicSend { private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String msg = "hello topic"; // String routingKey = "info"; channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,msg.getBytes()); System.out.println("send" + msg); channel.close(); connection.close(); } }
再来consumer:
public class TopicRecv1 { private static final String QUEUE_NAME="test_queue_topic_1"; private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //获取通道 final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add"); //定义消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body,"utf-8"); System.out.println("[1]---recv:" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //监听队列 // boolean autoACK = true;//自动应答 boolean autoACK = false; channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer); } }
public class TopicRecv2 { private static final String QUEUE_NAME="test_queue_topic_2"; private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //获取通道 final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#"); //定义消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body,"utf-8"); System.out.println("[2]---recv:" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //监听队列 // boolean autoACK = true;//自动应答 boolean autoACK = false; channel.basicConsume(QUEUE_NAME,autoACK,defaultConsumer); } }聪明如你,可以看出来消费者1和2区别就在于goods.add和goods.#,#代表匹配多个
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");好了,以上就是rabbitmq的简单使用,想了解更多,可以看看这篇文章https://www.jianshu.com/p/79ca08116d57