RabbitMQ
分类:
文章
•
2024-10-17 12:59:52
-
是什么?
- 消息中间件
- RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写
- 官网:https://www.rabbitmq.com/
-
干什么?
- 案例:公众号推送通知,发送消息到消息中间件服务器,手机微信里的消息中间件客户端,就会自动去把消息获取出来显示。
-
怎么用?
-
Windows
- 下载erlang(RabbitMQ依赖erlang环境) https://www.erlang.org/downloads
- 运行EXE安装
- 与java的jdk一样配置环境变量,只需配置PATH,增加一个erlang安装路径
- 下载rabbit http://dl.bintray.com/rabbitmq/all/rabbitmq-server/
- 安装
- 在安装目录的sbin下打开cmd 输入命令rabbitmq-plugins enable rabbitmq_management安装可视化插件
- 浏览器输入:http://127.0.0.1:15672 进行查看,出现登录界面即成功,账号密码:guest
-
Linux
- 安装之前要装一些必要的库:
sudo yum install build-essential
sudo yum install libncurses5-dev
No package libncurses5-dev available. 如果安装时提示无工作可做,可以终止安装了
sudo yum install libssl-dev
sudo yum install m4
sudo yum install unixodbc unixodbc-dev
sudo yum install freeglut3-dev libwxgtk2.8-dev
sudo yum install xsltproc
sudo yum install fop
sudo yum t install tk8.5
- 执行安装依赖环境:
sudo yum install erlang
查看版本
erl
- 安装RabbitMQ:
sudo yum install rabbitmq-server
查看状态
service rabbitmq-server status
安装可视化插件
sudo rabbitmq-plugins enable rabbitmq_management
sudo chkconfig rabbitmq-server on # 添加开机启动RabbitMQ服务
- sudo /sbin/service rabbitmq-server start # 启动服务
- sudo /sbin/service rabbitmq-server status # 查看服务状态
- sudo /sbin/service rabbitmq-server stop # 停止服务
打开端口
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
- /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
-
模式
-
Fanout(广播模式:一对多)
-
Direct(指定队列,点对点)
-
Topic(订阅模式,多对多,匹配模式)
- Header()
-
怎么实现
-
Fanout模式(广播模式,一对多)
-
发送者和消费者都需要声明工厂并指定服务地址,启动连接,创建通道
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ相关信息-服务地址
- factory.setHost("localhost");
- //连接
- Connection connection = factory.newConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
- //发送者声明交换机,以及消息类型
- channel.exchangeDeclare("fanout_exchange", "fanout");
-
exchangeDeclare方法参数:
参数一String exchange: 交换器名称
参数二String type :交换器类型 "direct", "fanout", "topic"
参数三Boolean durable:是否持久化,设为true可以将交换机存盘
参数四Boolean autoDelete:是否自动删除(一般不设置)
- 生产者把消息交给消息队列
channel.basicPublish("fanout_exchange", "", null, message.getBytes("UTF-8"));
-
basicPublish方法参数:
参数一String exchange:交换器名称
参数二String routingKey:路由关键字
参数三BasicProperties props:消息的基本属性,例如路由头等
参数四byte[] body:消息体
- 关闭通道和连接
channel.close();
- connection.close();
-
接收者 第一步与i相同
- 交换机中存储的队列需要声明一个零时队列
//获取一个临时队列
- String queueName = channel.queueDeclare().getQueue();
- //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
- channel.queueBind(queueName, "fanout_exchange","");
-
通过监听,交换机中有消息则取出来
// 监听那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body)
- throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println(name + " 接收到消息 '" + message + "'");
- }
- };
- //自动回复队列 RabbitMQ中的消息确认机制,表示已经取了该条消息了
- channel.basicConsume(queueName, true, consumer);
-
Direct(指定队列,点对点)
-
发送者,直接发送消息,给消息队列,并指定路由关键字
//创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ相关信息-服务地址
- factory.setHost("localhost");
- //连接
- Connection connection = factory.newConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
- //发送者,发送消息到队列中,参数与a)ii.1相同
- channel.basicPublish("","direct_queue", null, message.getBytes("UTF-8"));
-
接收者
//创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ相关信息-服务地址
- factory.setHost("localhost");
- //连接
- Connection connection = factory.newConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
//声明要关注的队列
- channel.queueDeclare("direct_queue", false, false, true, null);
- queueDeclare方法参数:
参数一String queue:队列名称
参数二Boolean durable:是否持久化
参数三 Boolean exclusive:是否在关闭时删除该队列,是否是私有的
参数四 Boolean autoDelete:是否自动删除
参数五Map arguments:更多明细设置
- 剩下的取出消息,关闭连接、通道与a)vi 相同
-
Topic(订阅模式,多对多,匹配模式)
-
发送者
//创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ相关信息-服务地址
- factory.setHost("localhost");
- //创建一个新的连接
- Connection connection = factory.newConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
- //声明交换机,参数与a)1 相同
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//创建路由
- String[] routing_keys = new String[] { "usa.news", "usa.weather",
- "europe.news", "europe.weather" };
- //创建消息
- String[] messages = new String[] { "美国新闻", "美国天气",
- "欧洲新闻", "欧洲天气" };
- //把消息交给队列,给上不同的路由,参数与a)i1相同
for (int i = 0; i < routing_keys.length; i++) {
- String routingKey = routing_keys[i];
- String message = messages[i];
- channel.basicPublish(EXCHANGE_NAME, routingKey, null, message
- .getBytes());
- }
-
接收者
// 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ地址
- factory.setHost("localhost");
- //创建一个新的连接
- Connection connection = factory.newConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
- //交换机声明(参数为:交换机名称;交换机类型)
- channel.exchangeDeclare(EXCHANGE_NAME,"topic");
- //获取一个临时队列
- String queueName = channel.queueDeclare().getQueue();
- //根据模糊路由接受后缀为news的路由消息
- channel.queueBind(queueName, EXCHANGE_NAME, "*.news");
- queueBind方法参数:
参数一String queue队列名称
参数二String exchange交换器名称
参数三String routingKey路由key
- 剩下的取出消息,关闭连接、通道与a)vi 相同
-
为什么用?
- 消息中间件常见有
-
RocketMQ (阿里系下开源的一款分布式、队列模型的消息中间件)
- 产品新文档缺乏
- 部分阿里产品未开源
-
未在核心实现JMS等接口
-
RabbitMQ(Erlang编写的一个开源的消息队列,支持很多的协议)
-
Erlang语言难度大
- 集群不支持动态扩展
-
ActiveMQ(Apache下的一个子项目)
-
目前核心放在6.0产品Apollo上对于之前的维护较少
- 不适合上千队列的使用
-
Redis (本身支持MQ功能,所以完全可以当做一个轻量级的队列服务)
- 重心更偏向存储
-
Kafka(Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统)
-
ZeroMQ(号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景)
-
综合:
-
按照目前网络上的资料,
RabbitMQ
、activeM
、ZeroMQ
三者中,综合来看,RabbitMQ
是首选
-
ZeroMq
不支持,ActiveMq
和RabbitMq
都支持持久化消息
-
可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件
RabbitMq
/ Kafka
最好,ActiveMq
次之,ZeroMq
最差
-
并发
RabbitMQ
最高erlang
语言天生具备高并发高可用
-
订阅模式发送者
接收者