ActiveMQ 虚拟主题Visual Topic
概述
虚拟Destinations用来创建逻辑Destinations,客户端可以通过它来生产和消费消息,它会把消息映射到物理Destinations。ActiveMQ支持两种方式:1:虚拟主题(Virtual Topics)
2:组合 Destinations(Composite Destinations)
为何使用虚拟主题
ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:1:同一应用内consumer端负载均衡的问题:也即是同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。
queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。
2:同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高
为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能
如何使用虚拟主题
1:对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。
例如VirtualTopic.Orders,代码示例如下:
Topic destination = session.createTopic("VirtualTopic.Orders");//创建主题,但是命名必须是VirtualTopic.开头
public class PersistanceSender { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("VirtualTopic.Mytopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置传送模式为持久模式,默认为非持久 connection.start(); for(int i=0; i<3; i++) { TextMessage message = session.createTextMessage("message--"+i); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } session.commit(); session.close(); connection.close(); } }
例如Consumer.A.VirtualTopic.Orders,说明它是名称为A的消费端,同理
Consumer.B.VirtualTopic.Orders说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。
又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
代码示例如下:
Destination destination = session.createQueue("Consumer.A.VirtualTopic.Orders");//Consumer.是默认配置的开头,A带面每一个订阅者
public class QueueReceiver { public static void main(String[] args) throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("Consumer.A.VirtualTopic.Mytopic"); MessageConsumer consumer = session.createConsumer(destination); int i=0; while(i<3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); System.out.println("收到消 息:" + message.getText()); } session.close(); connection.close(); } }
每一个订阅者也许要先已阅该Queue才能接受到虚拟主题的消息
3:默认虚拟主题的前缀是 :VirtualTopic.>
自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>
自定义消费虚拟地址可以改,比如下面的配置就把它修改了。
xml配置示例如下:
<broker xmlns="http://activemq.apache.org/schema/core">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/><!-- 修改的Consumer的开头格式-->
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>