RabbitMQ(客户端开发)

连接 RabbitMQ

常用的连接RabbitMQ的方式有两种:

        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT) ;
        factory.setUsername( "SpringCloud" );
        factory.setPassword( "123456789");
        Connection connection=factory.newConnection();//创建连接
        ConnectionFactory factory=new ConnectionFactory();
        factory.setUri("amqp://SpringCloud:[email protected]"+IP_ADDRESS+":"+PORT+"");
        Connection connection=factory.newConnection();//创建连接

    Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享, 应用程序应该为每一个线程开辟一个 Channel 。某些情况下 Channel 的操作可以并发运行,但 是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认( publisher confirm)机制的运行,所以多线程间共享 Channel 实例是非线程安全的。

使用交换器和队列

        channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
        //创建一个持久化、非排他的、非自动删除的队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //将交换器与队列通过路由键绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);

   这里的队列被声明为持久化的、非排他的、 非自动删除的,而且也被分配另一个确定的己知的名称(由客户端分配而非 RabbitMQ 自动生成)。 

        // 创建一个 type="direct" 、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
        //创建了一个非持久化的、排他的、自动删除的队列
        String queueName=channel.queueDeclare().getQueue();
        //将交换器与队列通过路由键绑定
        channel.queueBind(queueName,EXCHANGE_NAME,ROUTING_KEY);

    上面创建了一个持久化的、非自动删除的、绑定类型为 direct 的交换器,同时也创建了一 个非持久化的、排他的、自动删除的队列(此队列的名称由 RabbitMQ 自动生成)。这里的交换器和队列也都没有设置特殊的参数。 上面的代码也展示了如何使用路由键将队列和交换器绑定起来。上面声明的队列具备如下 特性 只对当前应用中同一个 Connection 层面可用,同一个 Connection 的不同 Channel 可共用,并且也会在应用连接断开时自动删除。

    除了队列与交换器绑定还有交换器与交换器之间的绑定:exchangeBind();

        channel.exchangeDeclare( " source " , "direct " , false , true , null) ;
        channel.exchangeDeclare( "destination " , " fanout " , false , true , null);
        channel.exchangeBind( " destination " , " source " , " exKey");
        channel.queueDeclare( "queue " , false , false , true , null);
        channel.queueBind( " queue " , "destination","" ) ;
        channel.basicPublish( " source " , " exKey " , null , " exToExDemo ".getBytes()) ;

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换 destination,并把消息转发到 destination中,进而存储在 destination 绑定的队列 queue中。

RabbitMQ(客户端开发)

按照 RabbitMQ 官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。

发送消息

如果要发送一个消息,可以使用 Channel 类的 basicPublish 方法:

        String message="发送的消息@@@@@@@!!!!";
        channel.basicPublish(EXCHANGE_NAME , ROUTING_KEY ,
                MessageProperties.PERSISTENT_TEXT_PLAIN ,
                message.getBytes()) ;

消费消息

     RabbitMQ 的消费模式分两种:(Push)模式和拉(Pull)模式。推模式采用 Basic.Consume 进行消费,而拉模式则是调用 Basic.Get 进行消费。

推模式

    接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现。当调用与Consumer 相关的 API 方法时,不同的订阅采用不同的消费者标签 (consumerTag) 来区分彼此 ,在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分,关键如下:

        // 设置客户端最多接收未被 ack 的消息的个数
        boolean autoAck=false;
        channel.basicQos(64);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                String routingKey = envelope.getRoutingKey( );
                String contentType = properties.getContentType() ;
                long deliveryTag = envelope.getDeliveryTag() ;
                System.out.println(routingKey+"---"+deliveryTag+"---"+contentType);
                System.out.println(" 接受消息: " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME ,autoAck, consumer);

    上面代码中显式地设置 autoAck为false,然后在接收到消息之后进行显式 ack 操作(channel.basicAck ), 对于消费者来说这 个设置是非常必要的,可以防止消息不必要的丢失。

RabbitMQ(客户端开发)

    和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些 callback 会被分配到与 Channel 不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclare、channel.basicCancel 等。

    每个 Channel 都拥有自己独立的线程。最常用的做法是一个Channel 对应一个消费者,也就是意味着消费者彼此之间没有任何关联。当然也可以在一个 Channel 中维持多个消费者, 但是要注意一个问题,如果 Channel 中的一个消费者一直在运行,那么其他消费者的 callback 会被"耽搁"。

拉模式

   通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetRespone。

        GetResponse response = channel.basicGet(QUEUE_NAME , false) ;
        System.out.println(new String(response.getBody()));
        channel.basicAck(response.getEnvelope().getDeliveryTag() , false);

RabbitMQ(客户端开发)

    Basic.Consume 将信道 (Channel) 直为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos 的限制.如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费。但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。

消费端的确认与拒绝

    消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除) 。当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

    当autoAck 参数置为 false ,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

    如果想明确拒绝当前的消息而不是确认:消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

  void basicReject(long deliveryTag, boolean requeue) throws IOException;

    如果想要批量拒绝消息 ,则可以使用 Basic.Nack 这个命令,消费者客户端可以调用 channel.basicNack 方法来实现。

    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

    将channel.basicReject 或者 channel.basicNack 中的 requeue 设直为 false ,可以启用"死信队列"的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。

    对于 requeue AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性 。其对应的客户端方法为:

     Basic.RecoverOk basicRecover() throws IOException;

    Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

    这个 channel.basicRecover 方法用来请求 RabbitMQ 重新发送还未被确认的消息。如果 requeue 参数设置为 true ,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果 requeue 参数设置为 false ,那么同一条消 息会被分配给与之前相同的消费者。默认情况下,如果不设置 requeue 这个参数,相当于 channel.basicRecover(true) ,即 requeue 默认为 true。

关闭连接

在应用程序使用完之后,需要关闭连接,释放资源:

        channel.close();
        connection.close();

    AMQP 协议中的 Connection、Channel 采用同样的方式来管理网络失败、内部错误和 显式地关闭连接。 Connection和Channel 所具备的生命周期如下所述。 

  • Open:开启状态,代表当前对象可以使用。 
  • Closing:正在关闭状态。当前对象被显式地通知调用关闭方法( shutdown) ,这样就产生了一个关闭请求让其内部对象进行相应的操作, 并等待这些关闭操作的完成。
  • Closed:已经关闭状态。当前对象己经接收到所有的内部对象己完成关闭动作的通知, 并且其也关闭了自身。

    Connection和Channel 最终都是会成为 Closed 的状态,不论是程序正常调用的关闭 方法,或者是客户端的异常,再或者是发生了网络异常。

参考《RabbitMQ实战指南》

附:

发送端代码:

package com.example.rabbitmq;

import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.AMQImpl;

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routing_key_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.31.25";
    private static final int PORT = 5672;//RabbitMQ 服务端默认端口号为 5672

    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {

        ConnectionFactory factory=new ConnectionFactory();
//        factory.setUri("amqp://SpringCloud:[email protected]"+IP_ADDRESS+":"+PORT+"");
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT) ;
        factory.setUsername( "SpringCloud" );
        factory.setPassword( "123456789");
        Connection connection=factory.newConnection();//创建连接
        Channel channel=connection.createChannel();//创建通道
        // 创建一个 type="direct" 、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
        //创建一个持久化、非排他的、非自动删除的队列
//        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //创建了一个非持久化的、排他的、自动删除的队列
        String queueName=channel.queueDeclare().getQueue();
        //将交换器与队列通过路由键绑定
        channel.queueBind(queueName,EXCHANGE_NAME,ROUTING_KEY);
        //发送一条持久化的消息
        String message="发送的消息@@@@@@@!!!!";
        channel.basicPublish(EXCHANGE_NAME , ROUTING_KEY ,
                MessageProperties.PERSISTENT_TEXT_PLAIN ,
                message.getBytes()) ;
        //关闭资源
        channel.close();
        connection.close();

    }

}

接受端代码:

package com.example.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer {
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.31.25";
    private static final int PORT = 5672;//RabbitMQ 服务端默认端口号为 5672

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("SpringCloud");
        factory.setPassword("123456789");
        Connection connection = factory.newConnection(new Address[]{
                new Address(IP_ADDRESS, PORT)});//创建连接
        Channel channel = connection.createChannel();//创建通道

//        GetResponse response = channel.basicGet(QUEUE_NAME , false) ;
//        System.out.println(new String(response.getBody()));
//        channel.basicAck(response.getEnvelope().getDeliveryTag() , false);


        // 设置客户端最多接收未被 ack 的消息的个数
        boolean autoAck=false;
        channel.basicQos(64);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                String routingKey = envelope.getRoutingKey( );
                String contentType = properties.getContentType() ;
                long deliveryTag = envelope.getDeliveryTag() ;
                System.out.println(routingKey+"---"+deliveryTag+"---"+contentType);
                System.out.println(" 接受消息: " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME ,autoAck, consumer);
        //等待回调函数执行完毕之后 关闭资源
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        channel.close();
        connection.close();
    }
}