RabbitMQ入门与AMQP协议简介
文章目录
概述
RabbitMQ是一个开源的消息代理和队列服务器, 用来通过普通协议在完全不同的应用之间共享数据, RabbitMQ使用Erlang语言来编写的, 并且RabbitMQ是基于AMQP协议的
优点
- 开源, 性能有效, 稳定性好
- 提供可靠性消息投递模式(confirm), 返回模式(return)等
- 与Spring完美整合, API丰富
- 集群模式丰富, 支持表达式配置, 高可用HA模式, 镜像队列模型
- 可以保证数据不丢失的前提下做到高可靠性, 可用性
RabbitMQ高性能的原因
- Erlang语言最初用于交换机领域的架构模式, 这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
- Erlang的优点 : Erlang有着和原生Socket一样的延迟
AMQP协议
AMQP简介
- AMQP全称 : Advanced Message Queuing Protocol
- 中文 : 高级消息队列协议
- AMQP定义 : 是具有现代特征的二进制协议, 是一个提供统一消息服务的应用层标准高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计
AMQP协议模型
- 生产者生产的消息通过Server->Virtual Host->Exchange
- Exchange和Queue之间进行绑定
- 消费者只需要监听Queue消息队列即可
下面会对AMQP的这些概念一一进行些介绍
AMQP核心概念
- Server : 又称Broker, 接受客户端连接, 实现AMQP实体服务
- Connection : 连接, 应用程序与Broker的网络连接
- Channel : 网络信道, 几乎所有的操作都在Channel中进行, Channel是进行消息读写的通道。客户端可以建立多个Channel, 每个Channel代表一个会话任务。
- Message : 消息, 服务器和应用程序之间传送的数据, 有Properties和Body组成。Properties可以对消息进行修饰, 比如消息的优先级, 延迟等高级特性; Body就是消息体内容。
- Virtual Host : 虚拟地址, 用于进行逻辑隔离, 最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue, 同一个Virtual Host里面不能有相同名称的Exchange或Queue
- Exchange : 交换机, 用于接收消息, 根据路由键转发消息到绑定的队列
- Binding : Exchange和Queue之间的虚拟连接, binding中可以包含routing key
- Routing Key : 一个路由规则, 虚拟机可用它来确定如何路由一个特定消息
- Queue : 也成Message Queue, 消息队列, 用于保存消息并将它们转发给消费者
RabbitMQ整体架构
- 生产者只需要将消息发送到Exchange即可
- 消费者只需要监听对应的消息队列即可
- Exchange绑定多个Queue时, 要通过Routing Key进行路由
RabbitMQ消息流转
RabbitMQ安装与使用
- 官网地址 : http://www.rabbitmq.com/
- 需要安装Erlang安装包, 配置Erlang环境
- 需要安装Linux必须的一些依赖包, 如果缺少一些依赖包, 可以自行百度一下, 我这边虚拟机之前装其他软件, 已经安装了一些依赖
- 下载RabbitMQ必须的安装包
- 配置文件修改
安装步骤
为了安装方便, 这里就使用rpm的方式进行安装
-
安装Erlang环境
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
yum install erlang-18.3-1.el7.centos.x86_64.rpm
-
安装socat依赖
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
yum install socat-1.7.3.2-5.el7.lux.x86_64.rpm
-
安装RabbitMQ
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
yum install rabbitmq-server-3.6.5-1.noarch.rpm
-
由于我的软件都安装在/usr/local/software目录下,所以我在这里建个软链接方便访问
ln -s /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5 /usr/local/software/rabbitmq
-
修改配置文件
vim /usr/local/software/rabbitmq/ebin/rabbit.app
RabbitMQ启动
-
服务的启动 : rabbitmq-server start &
-
服务的停止 : rabbitmqctl stop_app
-
管理插件 : rabbitmq-plugins enable rabbitmq_management
RabbitMQ管控台默认端口为15672, 访问地址 : http://192.168.72.138:15672
命令行与管控台
- 关闭应用 : rabbitmqctl stop_app
- 启动应用 : rabbitmqctl start_app
- 节点状态 : rabbitmqctl status
用户相关 :
- 添加用户 : rabbitmqctl add_user username password
- 列出所有用户 : rabbitmqctl list_users
- 删除用户 : rabbitmqctl delete_user username
- 清除用户权限 : rabbitmqctl clear_permissions -p vhostpath username
- 列出用户权限 : rabbitmqctl list_user_permissions username
- 修改密码 : rabbitmqctl change_password username newpassword
- 设置用户权限 : rabbitmqctl set_permissions -p vhostpath username
虚拟主机相关 :
- 创建虚拟主机 : rabbitmqctl add_vhost vhostpath
- 列出所有虚拟主机 : rabbitmqctl list_vhosts
- 列出虚拟主机上所有权限 : rabbitmqctl list_permissions -p vhostpath
- 删除虚拟主机 : rabbitmqctl delete_vhostpath
队列相关 :
- 查看所有队列信息 : rabbitmqctl list_queues
- 清除队列里的消息 : rabbitmqctl -p vhostpath purge_queue blue
高级操作(主要是集群相关) :
-
移除所有数据 : rabbitmqctl reset, 要在rabbitmqctl stop_app之后使用过
-
组成集群命令 : rabbitmqctl join_cluster [–ram]
–ram表示指定数据存储模式, --ram表示数据存储到内存中
-
查看集群状态 : rabbitmqctl cluster_status
-
修改集群节点的存储形式 : rabbitmqctl change_cluster_node_type disc | ram
ram : 内存
disc : 磁盘
-
忘记节点(移除节点) : rabbitmqctl forget_cluster_node [–offline]
offline表示可以在服务没启动时使用
-
修改节点名称 : rabbitmqctl rename_cluster_node oldnode1 newnode1 oldnode2 new node2
快速入门
- ConnectionFactory : 获取连接工厂
- Connection : 一个连接
- Channel : 数据通信信道, 可发送和接收消息
- Queue : 具体的消息存储队列
- Producer & Consumer生产者和消费者
添加Maven依赖
这里为了简单起见, 就直接使用Maven来引入jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
生产者代码
package com.qiyexue.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*
* @author 七夜雪
* @create 2018-12-13 20:43
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 创建连接
Connection connection = factory.newConnection();
// 3. 使用connection创建channel
Channel channel = connection.createChannel();
// 4. 通过channel发送消息
String msg = "hello rabbitmq!";
for (int i = 0; i < 5; i++) {
// 不指定exchange的情况下, 使用默认的exchange, routingKey与队列名相等
channel.basicPublish("", "test01", null, msg.getBytes());
}
// 5. 关闭连接
channel.close();
connection.close();
}
}
消费者代码
package com.qiyexue.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 消费者
*
* @author 七夜雪
* @create 2018-12-13 20:57
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 创建连接
Connection connection = factory.newConnection();
// 3. 使用connection创建channel
Channel channel = connection.createChannel();
// 4. 声明(创建)一个队列
String queueName = "test01";
channel.queueDeclare(queueName,true, false, false, null);
// 5. 创建消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 6. 设置channel
channel.basicConsume(queueName, true, consumer);
while (true) {
// 7. 获取消息
Delivery delivery = consumer.nextDelivery();
System.err.println(new String(delivery.getBody()));
}
}
}