RabbitMQ

  1. 是什么?
    1. 消息中间件
    2. RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写
    3. 官网:https://www.rabbitmq.com/
       
  2. 干什么?
    1. 案例:公众号推送通知,发送消息到消息中间件服务器,手机微信里的消息中间件客户端,就会自动去把消息获取出来显示。
       
  3. 怎么用?
    1. Windows
      1. 下载erlang(RabbitMQ依赖erlang环境) https://www.erlang.org/downloads
      2. 运行EXE安装
      3. 与java的jdk一样配置环境变量,只需配置PATH,增加一个erlang安装路径
      4. 下载rabbit http://dl.bintray.com/rabbitmq/all/rabbitmq-server/
      5. 安装
      6. 在安装目录的sbin下打开cmd 输入命令rabbitmq-plugins enable rabbitmq_management安装可视化插件
      7. 浏览器输入:http://127.0.0.1:15672 进行查看,出现登录界面即成功,账号密码:guest
    2. Linux
      1. 安装之前要装一些必要的库:
        sudo yum install build-essential
        sudo yum install libncurses5-dev
           No package libncurses5-dev available. 如果安装时提示无工作可做,可以终止安装了
        sudo yum install libssl-dev
        sudo yum install m4
        sudo yum install unixodbc unixodbc-dev
        sudo yum install freeglut3-dev libwxgtk2.8-dev
        sudo yum install xsltproc
        sudo yum install fop
        sudo yum t install tk8.5
         
      2. 执行安装依赖环境:
        sudo yum install erlang
        查看版本
        erl
         
      3. 安装RabbitMQ:
        sudo yum install rabbitmq-server
        查看状态
        service rabbitmq-server status
        安装可视化插件
        sudo rabbitmq-plugins enable rabbitmq_management
        sudo chkconfig rabbitmq-server on  # 添加开机启动RabbitMQ服务
      4. sudo /sbin/service rabbitmq-server start # 启动服务
      5. sudo /sbin/service rabbitmq-server status  # 查看服务状态
      6. sudo /sbin/service rabbitmq-server stop   # 停止服务
        打开端口
        /sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT  
      7. /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT  
  4. 模式
    1. Fanout(广播模式:一对多)
      1. RabbitMQ
    2. Direct(指定队列,点对点)
      1. RabbitMQ
    3. Topic(订阅模式,多对多,匹配模式)
      1. RabbitMQ
    4. Header()
  5. 怎么实现
    1. Fanout模式(广播模式,一对多)
      1. 发送者和消费者都需要声明工厂并指定服务地址,启动连接,创建通道
      2. //创建连接工厂
      3. ConnectionFactory factory = new ConnectionFactory();
      4. //设置RabbitMQ相关信息-服务地址
      5. factory.setHost("localhost");
      6. //连接
      7. Connection connection = factory.newConnection();
      8. //创建一个通道
      9. Channel channel = connection.createChannel();
      10. //发送者声明交换机,以及消息类型
      11. channel.exchangeDeclare("fanout_exchange", "fanout");
        1. exchangeDeclare方法参数:
          参数一String exchange 交换器名称
          参数二String type :交换器类型 "direct", "fanout", "topic"
          参数三Boolean durable:是否持久化,设为true可以将交换机存盘
          参数四Boolean autoDelete:是否自动删除(一般不设置)
      12. 生产者把消息交给消息队列
        channel.basicPublish("fanout_exchange", "", null, message.getBytes("UTF-8"));
        1. basicPublish方法参数:
          参数一String exchange:交换器名称
          参数二String routingKey:路由关键字
          参数三BasicProperties props:消息的基本属性,例如路由头等
          参数四byte[] body:消息体
      13. 关闭通道和连接
        channel.close();
      14. connection.close();
      15. 接收者 第一步与i相同
      16. 交换机中存储的队列需要声明一个零时队列
        //获取一个临时队列
      17. String queueName = channel.queueDeclare().getQueue();
      18. //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
      19. channel.queueBind(queueName, "fanout_exchange","");
      20. 通过监听,交换机中有消息则取出来
        //
        监听那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
      21. Consumer consumer = new DefaultConsumer(channel) {
      22.     @Override
      23.     public void handleDelivery(String consumerTag, Envelope envelope,
      24.                                AMQP.BasicProperties properties, byte[] body)
      25.             throws IOException {
      26.         String message = new String(body, "UTF-8");
      27.         System.out.println(name + " 接收到消息 '" + message + "'");
      28.     }
      29. };
      30. //自动回复队列 RabbitMQ中的消息确认机制,表示已经取了该条消息了
      31. channel.basicConsume(queueName, true, consumer);
    2. Direct(指定队列,点对点)
      1. 发送者,直接发送消息,给消息队列,并指定路由关键字
        //创建连接工厂
      2. ConnectionFactory factory = new ConnectionFactory();
      3. //设置RabbitMQ相关信息-服务地址
      4. factory.setHost("localhost");
      5. //连接
      6. Connection connection = factory.newConnection();
      7. //创建一个通道
      8. Channel channel = connection.createChannel();
        1. //发送者,发送消息到队列中,参数与a)ii.1相同
        2. channel.basicPublish("","direct_queue", null, message.getBytes("UTF-8"));
      9. 接收者
        //创建连接工厂
      10. ConnectionFactory factory = new ConnectionFactory();
      11. //设置RabbitMQ相关信息-服务地址
      12. factory.setHost("localhost");
      13. //连接
      14. Connection connection = factory.newConnection();
      15. //创建一个通道
      16. Channel channel = connection.createChannel();
        //声明要关注的队列
      17. channel.queueDeclare("direct_queue", false, false, true, null);
        1. queueDeclare方法参数:
          参数一String queue:队列名称
          参数二Boolean durable:是否持久化
          参数三 Boolean exclusive:是否在关闭时删除该队列,是否是私有的
          参数四 Boolean autoDelete:是否自动删除
          参数五Map arguments:更多明细设置
        2. 剩下的取出消息,关闭连接、通道与a)vi 相同
    3. Topic(订阅模式,多对多,匹配模式)
      1. 发送者
        //创建连接工厂
      2. ConnectionFactory factory = new ConnectionFactory();
      3. //设置RabbitMQ相关信息-服务地址
      4. factory.setHost("localhost");
      5. //创建一个新的连接
      6. Connection connection = factory.newConnection();
      7. //创建一个通道
      8. Channel channel = connection.createChannel();
      9. //声明交换机,参数与a)1 相同
      10. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //创建路由
      11. String[] routing_keys = new String[] { "usa.news", "usa.weather",
      12.         "europe.news", "europe.weather" };
      13. //创建消息
      14. String[] messages = new String[] { "美国新闻", "美国天气",
      15.         "欧洲新闻", "欧洲天气" };
      16. //把消息交给队列,给上不同的路由,参数与a)i1相同
        for (int i = 0; i < routing_keys.length; i++) {
      17.     String routingKey = routing_keys[i];
      18.     String message = messages[i];
      19.     channel.basicPublish(EXCHANGE_NAME, routingKey, null, message
      20.             .getBytes());
      21. }
      22. 接收者
        // 创建连接工厂
      23. ConnectionFactory factory = new ConnectionFactory();
      24. //设置RabbitMQ地址
      25. factory.setHost("localhost");
      26. //创建一个新的连接
      27. Connection connection = factory.newConnection();
      28. //创建一个通道
      29. Channel channel = connection.createChannel();
      30. //交换机声明(参数为:交换机名称;交换机类型)
      31. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
      32. //获取一个临时队列
      33. String queueName = channel.queueDeclare().getQueue();
      34. //根据模糊路由接受后缀为news的路由消息
      35. channel.queueBind(queueName, EXCHANGE_NAME, "*.news");
        1. queueBind方法参数:
          参数一String queue队列名称
          参数二String exchange交换器名称
          参数三String routingKey路由key
        2. 剩下的取出消息,关闭连接、通道与a)vi 相同
  6. 为什么用?
    1. 消息中间件常见有
      1. RocketMQ (阿里系下开源的一款分布式、队列模型的消息中间件)
        1. 产品新文档缺乏
        2. 部分阿里产品未开源
        3. 未在核心实现JMS等接口
      2. RabbitMQ(Erlang编写的一个开源的消息队列,支持很多的协议)
        1. Erlang语言难度大
        2. 集群不支持动态扩展
      3. ActiveMQ(Apache下的一个子项目)
        1. 目前核心放在6.0产品Apollo上对于之前的维护较少
        2. 不适合上千队列的使用
      4. Redis (本身支持MQ功能,所以完全可以当做一个轻量级的队列服务)
        1. 重心更偏向存储
      5. Kafka(Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统)
      6. ZeroMQ(号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景)
      7. 综合:
        1. 按照目前网络上的资料,RabbitMQ activeM ZeroMQ 三者中,综合来看,RabbitMQ 是首选
        2. ZeroMq 不支持,ActiveMq RabbitMq 都支持持久化消息
        3. 可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差
        4. 并发RabbitMQ 最高erlang 语言天生具备高并发高可用
  7. ​​​​​​​​​​​​​订阅模式​发送者RabbitMQ
    接收者RabbitMQ