RabbitMQ入门与AMQP协议简介

概述

RabbitMQ是一个开源的消息代理和队列服务器, 用来通过普通协议在完全不同的应用之间共享数据, RabbitMQ使用Erlang语言来编写的, 并且RabbitMQ是基于AMQP协议的

优点

  • 开源, 性能有效, 稳定性好
  • 提供可靠性消息投递模式(confirm), 返回模式(return)等
  • 与Spring完美整合, API丰富
  • 集群模式丰富, 支持表达式配置, 高可用HA模式, 镜像队列模型
  • 可以保证数据不丢失的前提下做到高可靠性, 可用性

RabbitMQ高性能的原因

  • Erlang语言最初用于交换机领域的架构模式, 这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
  • Erlang的优点 : Erlang有着和原生Socket一样的延迟

AMQP协议

AMQP简介

  • AMQP全称 : Advanced Message Queuing Protocol
  • 中文 : 高级消息队列协议
  • AMQP定义 : 是具有现代特征的二进制协议, 是一个提供统一消息服务的应用层标准高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计

AMQP协议模型

RabbitMQ入门与AMQP协议简介

  1. 生产者生产的消息通过Server->Virtual Host->Exchange
  2. Exchange和Queue之间进行绑定
  3. 消费者只需要监听Queue消息队列即可

下面会对AMQP的这些概念一一进行些介绍

AMQP核心概念

  • Server : 又称Broker, 接受客户端连接, 实现AMQP实体服务
  • Connection : 连接, 应用程序与Broker的网络连接
  • Channel : 网络信道, 几乎所有的操作都在Channel中进行, Channel是进行消息读写的通道。客户端可以建立多个Channel, 每个Channel代表一个会话任务。
  • Message : 消息, 服务器和应用程序之间传送的数据, 有Properties和Body组成。Properties可以对消息进行修饰, 比如消息的优先级, 延迟等高级特性; Body就是消息体内容。
  • Virtual Host : 虚拟地址, 用于进行逻辑隔离, 最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue, 同一个Virtual Host里面不能有相同名称的Exchange或Queue
  • Exchange : 交换机, 用于接收消息, 根据路由键转发消息到绑定的队列
  • Binding : Exchange和Queue之间的虚拟连接, binding中可以包含routing key
  • Routing Key : 一个路由规则, 虚拟机可用它来确定如何路由一个特定消息
  • Queue : 也成Message Queue, 消息队列, 用于保存消息并将它们转发给消费者

RabbitMQ整体架构

RabbitMQ入门与AMQP协议简介

  • 生产者只需要将消息发送到Exchange即可
  • 消费者只需要监听对应的消息队列即可
  • Exchange绑定多个Queue时, 要通过Routing Key进行路由

RabbitMQ消息流转

RabbitMQ入门与AMQP协议简介

RabbitMQ安装与使用

  • 官网地址 : http://www.rabbitmq.com/
  • 需要安装Erlang安装包, 配置Erlang环境
  • 需要安装Linux必须的一些依赖包, 如果缺少一些依赖包, 可以自行百度一下, 我这边虚拟机之前装其他软件, 已经安装了一些依赖
  • 下载RabbitMQ必须的安装包
  • 配置文件修改

安装步骤

为了安装方便, 这里就使用rpm的方式进行安装

  1. 安装Erlang环境

    wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

    yum install erlang-18.3-1.el7.centos.x86_64.rpm

  2. 安装socat依赖

    wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

    yum install socat-1.7.3.2-5.el7.lux.x86_64.rpm

  3. 安装RabbitMQ

    wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

    yum install rabbitmq-server-3.6.5-1.noarch.rpm

  4. 由于我的软件都安装在/usr/local/software目录下,所以我在这里建个软链接方便访问

    ln -s /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5 /usr/local/software/rabbitmq

  5. 修改配置文件

    vim /usr/local/software/rabbitmq/ebin/rabbit.app

RabbitMQ启动

  • 服务的启动 : rabbitmq-server start &

  • 服务的停止 : rabbitmqctl stop_app

  • 管理插件 : rabbitmq-plugins enable rabbitmq_management

    RabbitMQ管控台默认端口为15672, 访问地址 : http://192.168.72.138:15672

命令行与管控台

  • 关闭应用 : rabbitmqctl stop_app
  • 启动应用 : rabbitmqctl start_app
  • 节点状态 : rabbitmqctl status

用户相关 :

  • 添加用户 : rabbitmqctl add_user username password
  • 列出所有用户 : rabbitmqctl list_users
  • 删除用户 : rabbitmqctl delete_user username
  • 清除用户权限 : rabbitmqctl clear_permissions -p vhostpath username
  • 列出用户权限 : rabbitmqctl list_user_permissions username
  • 修改密码 : rabbitmqctl change_password username newpassword
  • 设置用户权限 : rabbitmqctl set_permissions -p vhostpath username

虚拟主机相关 :

  • 创建虚拟主机 : rabbitmqctl add_vhost vhostpath
  • 列出所有虚拟主机 : rabbitmqctl list_vhosts
  • 列出虚拟主机上所有权限 : rabbitmqctl list_permissions -p vhostpath
  • 删除虚拟主机 : rabbitmqctl delete_vhostpath

队列相关 :

  • 查看所有队列信息 : rabbitmqctl list_queues
  • 清除队列里的消息 : rabbitmqctl -p vhostpath purge_queue blue

高级操作(主要是集群相关) :

  • 移除所有数据 : rabbitmqctl reset, 要在rabbitmqctl stop_app之后使用过

  • 组成集群命令 : rabbitmqctl join_cluster [–ram]

    –ram表示指定数据存储模式, --ram表示数据存储到内存中

  • 查看集群状态 : rabbitmqctl cluster_status

  • 修改集群节点的存储形式 : rabbitmqctl change_cluster_node_type disc | ram

    ram : 内存

    disc : 磁盘

  • 忘记节点(移除节点) : rabbitmqctl forget_cluster_node [–offline]

    offline表示可以在服务没启动时使用

  • 修改节点名称 : rabbitmqctl rename_cluster_node oldnode1 newnode1 oldnode2 new node2

快速入门

  • ConnectionFactory : 获取连接工厂
  • Connection : 一个连接
  • Channel : 数据通信信道, 可发送和接收消息
  • Queue : 具体的消息存储队列
  • Producer & Consumer生产者和消费者

添加Maven依赖

这里为了简单起见, 就直接使用Maven来引入jar包

      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>3.6.5</version>
      </dependency>

生产者代码

package com.qiyexue.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 *
 * @author 七夜雪
 * @create 2018-12-13 20:43
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂, 设置属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 创建连接
        Connection connection = factory.newConnection();

        // 3. 使用connection创建channel
        Channel channel = connection.createChannel();
        
        // 4. 通过channel发送消息
        String msg = "hello rabbitmq!";
        for (int i = 0; i < 5; i++) {
            // 不指定exchange的情况下, 使用默认的exchange, routingKey与队列名相等
            channel.basicPublish("", "test01", null, msg.getBytes());
        }

        // 5. 关闭连接
        channel.close();
        connection.close();
    }

}

消费者代码

package com.qiyexue.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
 * 消费者
 *
 * @author 七夜雪
 * @create 2018-12-13 20:57
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂, 设置属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 创建连接
        Connection connection = factory.newConnection();

        // 3. 使用connection创建channel
        Channel channel = connection.createChannel();

        // 4. 声明(创建)一个队列
        String queueName = "test01";
        channel.queueDeclare(queueName,true, false, false, null);

        // 5. 创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 6. 设置channel
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            // 7. 获取消息
            Delivery delivery = consumer.nextDelivery();
            System.err.println(new String(delivery.getBody()));
        }
        
    }

}