ActiveMQ 虚拟主题Visual Topic

概述
虚拟Destinations用来创建逻辑Destinations,客户端可以通过它来生产和消费消息,它会把消息映射到物理Destinations。ActiveMQ支持两种方式:
1:虚拟主题(Virtual Topics)
2:组合 Destinations(Composite Destinations)
为何使用虚拟主题
ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
1:同一应用内consumer端负载均衡的问题:也即是同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。
queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。
2:同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高

为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能

如何使用虚拟主题

1:对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。

例如VirtualTopic.Orders,代码示例如下:

Topic destination = session.createTopic("VirtualTopic.Orders");//创建主题,但是命名必须是VirtualTopic.开头

public class PersistanceSender {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("VirtualTopic.Mytopic");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置传送模式为持久模式,默认为非持久
        connection.start();
        for(int i=0; i<3; i++) {
            TextMessage message = session.createTextMessage("message--"+i);
            Thread.sleep(1000);
            //通过消息生产者发出消息
            producer.send(message);
        }
        session.commit();
        session.close();
        connection.close();
    }
}

ActiveMQ 虚拟主题Visual Topic

2:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。
例如Consumer.A.VirtualTopic.Orders,说明它是名称为A的消费端,同理
Consumer.B.VirtualTopic.Orders说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。
又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
代码示例如下:

Destination destination = session.createQueue("Consumer.A.VirtualTopic.Orders");//Consumer.是默认配置的开头,A带面每一个订阅者

public class QueueReceiver {
    public static void main(String[] args) throws Exception {
        ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection = cf.createConnection();
        connection.start();
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("Consumer.A.VirtualTopic.Mytopic");
        MessageConsumer consumer = session.createConsumer(destination);
        int i=0;
        while(i<3) {
            i++;
            TextMessage message = (TextMessage) consumer.receive();
            session.commit();
            System.out.println("收到消 息:" + message.getText());
        }
        session.close();
        connection.close();
    }
}

每一个订阅者也许要先已阅该Queue才能接受到虚拟主题的消息

ActiveMQ 虚拟主题Visual Topic

3:默认虚拟主题的前缀是 :VirtualTopic.>
自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>
自定义消费虚拟地址可以改,比如下面的配置就把它修改了。
xml配置示例如下:
<broker xmlns="http://activemq.apache.org/schema/core">
    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/><!-- 修改的Consumer的开头格式-->
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>