【ActiveMQ】五 基本元素高级特性
Destination 高级特性
一 组合队列
组合队列 Composite Destinations 组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过composite destinations在一个操作中同时向多个queue发送消息。
1.1 代码实现
客户端实现的方式 在composite destinations中,多个destination之间采用“,”分割。例如:
Queue queue= new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如: Queue queue= new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
Destination destination = session.createQueue("my-queue,my-queue2");
1.2 xml配置
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueuename="MY.QUEUE">
<forwardTo>
<queue physicalName="my-queue" />
<queue physicalName="my-queue2" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
二 队列失效
一般情况下,ActiveMQ的queue在不使用之后,可以通过web控制台或是JMX方式来删除掉。当然,也可以通过配置,使得broker可以自动探测到无用的队列(一定时间内为空的队列)并删除掉,回 收响应资源。可以如下配置:
<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
说明:
schedulePeriodForDestinationPurge:设置多长时间检查一次,这里是10秒,默认为0 inactiveTimoutBeforeGC:设置当Destination为空后,多长时间被删除,这里是30秒,默认为60
gcInactiveDestinations: 设置删除掉不活动队列,默认为false
三 虚拟目的地
3.1 为何使用虚拟主题
虚拟Destinations用来创建逻辑Destinations,客户端可以通过它来生产和消费消息,它会把消息映射到物理Destinations。ActiveMQ支持两种方式:
1:虚拟主题(Virtual Topics)
2:组合 Destinations(Composite Destinations)
ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
1:同一应用内consumer端负载均衡的问题:也即是同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。 queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以, 既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。
2:同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高,为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能
3.2 如何使用虚拟主题
1:对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如 VirtualTopic.Orders,代码示例如下:
Topic destination = session.createTopic("VirtualTopic.Orders");
2:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明 自己的身份即可实现消费端应用分组。 例如Consumer.A.VirtualTopic.Orders,说明它是名称为A的消费端,同理 Consumer.B.VirtualTopic.Orders说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者 共同来承担消费任务。
Destination destination= session.createQueue("Consumer.A.VirtualTopic.Orders");
3.3 设置虚拟地址前缀
默认虚拟主题的前缀是:VirtualTopic.> 自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.> 自定义消费虚拟地址可以改,比如下面的配置就把它修改了。 xml配置示例如下:
<broker xmlns="http://activemq.apache.org/schema/core">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopicname=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
3.3.1 producer
public class VirtualSender {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");
Connection createConnection = factory.createConnection();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("VirtualTopic.Test");
MessageProducer createProducer = createSession.createProducer(createTopic);
createProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
createConnection.start();
for(int i=0;i<3;i++){
TextMessage createTextMessage = createSession.createTextMessage("message"+i);
createProducer.send(createTextMessage);
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.3.2 receiver
(1) queueA
public class PersisiTopicReceiver {
public static void main(String[] args) throws JMSException, InterruptedException {
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = activeMQConnectionFactory.createConnection();
createConnection.setClientID("B_ID");
createConnection.start();
final Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
for(int i=0;i<1;i++){
Queue destination = createSession.createQueue("Consumer.A.VirtualTopic.Test");
MessageConsumer consumer = createSession.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage msg = (TextMessage)message;
System.out.println(msg);
try {
Thread.sleep(2000);
createSession.commit();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
}
}
(2) queueB
public class PersisiTopicReceiver2 {
public static void main(String[] args) throws JMSException, InterruptedException {
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");
Connection createConnection = activeMQConnectionFactory.createConnection();
createConnection.setClientID("B_ID2");
createConnection.start();
final Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
for(int i=0;i<1;i++){
Queue destination = createSession.createQueue("Consumer.B.VirtualTopic.Test");
MessageConsumer consumer = createSession.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage msg = (TextMessage)message;
System.out.println(msg);
try {
createSession.commit();
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
}
}
3.3.3 result
如下是通过一个构建一个虚拟主题,生成多个消费队列,实现负载均衡,
3.4 镜像队列
ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望 能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是为系统中每个queue都进行如此的配置可能会很麻烦。
使用 ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一 个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用 Mirrored Queues,首先要将Broker的useMirroredQueues属性设置成true,然后可
以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是 “VirtualTopic.Mirror.”。 比如修改后缀的配置示例如下:
<broker>
<destinationInterceptors>
<mirroredQueuecopyMessage="true" postfix=".qmirror" prefix=""/>
</destinationInterceptors>
</broker>
3.4.1 producer
public class MsgSendder {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://192.168.232.128:61616,tcp://192.168.232.128:61617)?randomize=false");
Connection connection = ConnectionFactoryconnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue,my-queue2");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 40; i++) {
TextMessage message = session.createTextMessage("message--" + i);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
3.4.2 consumer
public class PersisiTopicReceiver {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");
Connection createConnection = activeMQConnectionFactory.createConnection();
createConnection.setClientID("订阅者B_ID");
createConnection.start();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("my-queue.qmirror");
TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "T1");
TextMessage message = (TextMessage)createDurableSubscriber.receive();
while(message!=null){
System.out.println(message.getText());
message = (TextMessage)createDurableSubscriber.receive();
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.4.3 result
MessageDispatch 高级特性
一 异步发送
AciveMQ支持异步和同步发送消息,是可以配置的。通常对于快的消费者, 是直接把消息同步发送过去,但对于一个Slow Consumer,你使用同步发送消息 可能出现Producer堵塞等现象,慢消费者适合使用异步发送
配置使用
1:ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是 Fast Consumer则使用dispatcheAsync=false
2:在Connection URI级别来配置使用AsyncSendcf= new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
3:在ConnectionFactory级别来配置使用AsyncSend ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
4:在Connection级别来配置使用AsyncSend ((ActiveMQConnection)connection).setUseAsyncSend(true);
二 严格的顺序
严格顺序分发策略(Strict Order Dispatch Policy) 通常ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer以相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息。 Strict order dispatch policy 会保证每个topic consumer会以相同的顺序接收消 息,代价是性能上的损失。
以下是一个配置例子
<policyEntry topic="ORDERS.>">
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
对于Queue的配置为: <policyEntryqueue=">" strictOrderDispatch="false" />
三 轮询策略
轮询分发策略(Round Robin Dispatch Policy) ActiveMQ的prefetch缺省参数是针对处理大量消息时的高性能和高吞吐量而设置的,所以缺省的prefetch参数比较大。而且缺省的dispatch policies会尝试尽可能快的填满prefetch缓冲。 然而在有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这些少量的消息总是倾向于被分发到个别的 consumer上。这样就会因为负载的不均衡分配而导致处理时间的增加。
Round robin dispatch policy会尝试平均分发消息,以下是一个例子:
<policyEntrytopic="ORDERS.>">
<dispatchPolicy>
<roundRobinDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
Message 高级特性
一 基本属性
1:Queue的消息默认是持久化的。
2:消息的优先级默认是4。
3:消息发送时设置了时间戳。
4:消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略。
5:如果消息时重发的,将会标记出来。
6:JMSReplyTo标识响应消息发送到哪个Queue。
7:JMSCorelationID标识此消息相关联的消息id,可以用这个标识把多个消息连接起来。
8:JMS同时也记录了消息重发的次数,默认是6次。
9:如果有一组关联的消息需要处理,可以分组:只需要设置消息组的名字和这个消息是第几个消息。
10:如果消息中一个事务环境,则TXID将被设置。
11:此外ActiveMQ在服务器端额外设置了消息入列和出列的时间戳。
12:ActiveMQ里消息属性的值,不仅可以用基本类型,还可以用List或Map类型。
二 Advisory 系统消息
Advisory Message是ActiveMQ自身的系统消息地址,可以监听该地址来获取activemq的系统信息。
2.1 系统消息
1:consumers, producers 和 connections的启动和停止
2:创建和销毁temporary destinations
3:topics 和 queues的消息过期
4:brokers 发送消息给 destinations,但是没有consumers
5:connections 启动和停止
2.2 打开Advisories
默认Advisory的功能是关闭的
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntrytopic=">" advisoryForConsumed="true" />
</policyEntries>
</policyMap>
</destinationPolicy>
2.3 关闭Advisories
1:<broker advisorySupport="false">
2:也可在Java中写:
BrokerService broker = new BrokerService();
broker.setAdvisorySupport(false);
broker.start();
3:也可以在ActiveMQConnectionFactory上设置‘watchTopicAdvisories’ 属性
ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory(); factory.setWatchTopicAdvisories(false);
4:也可在ConnectionURl上写: "tcp://localhost:61616?jms.watchTopicAdvisories=false"
2.4 Advisories的使用步骤
使用的方法和步骤:
1:要在配置文件里面开启Advisories
2:消息发送端没有变化
3:消息接收端:
- 根据你要接收的信息类型,来设置不同的topic,当然也可以使用AdvisorySupport这个类来辅助创建,比如你想要得到消息生产者的信息,你可以:
Topic d=session.createTopic("ActiveMQ.Advisory.Producer.Topic.MyTopic");
Topic d = session.createTopic("MyTopic");
Destination d2 = AdvisorySupport.getProducerAdvisoryTopic(destination);
(2)由于这个topic默认不是持久化的,所以应该先开启接收端,然后再发送topic信息
(3)接收消息的时候,接收到的消息类型是ActiveMQMessage,所以类型转换的时候,要转换成 ActiveMQMessage,然后再通过getDataStructure方法来得到具体的信息对象,如: if (message instanceof ActiveMQMessage) {
try {
ActiveMQMessagea Msg= (ActiveMQMessage) message;
ProducerIn foprod = (ProducerInfo) aMsg.getDataStructure();
System.out.println("count==="+aMsg.getProperty("producerCount"));
System.out.println("prodd==="+prod.getProducerId());
} catch (Exception e) {
e.printStackTrace();
}
}
三 延时与定时消息
有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想 让消息没隔一定时间投递一次,一共投递指定的次数,类似这种需求,ActiveMQ提供了一种broker 端消息定时调度机制。 我们只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按 照我们想要的行为去处理消息。当然需要在xml中配置schedulerSupport属性为true。
3.1 4个属性
1:AMQ_SCHEDULED_DELAY :延迟投递的时间
2:AMQ_SCHEDULED_PERIOD :重复投递的时间间隔
3:AMQ_SCHEDULED_REPEAT:重复投递次数
4:AMQ_SCHEDULED_CRON:Cron表达式
ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,可以使用这个类来 辅助设置,使用例子如:延迟60秒
MessageProducerproducer = session.createProducer(destination);
TextMessage message = session.createTextMessage("testmsg");
long time = 60 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,time);
producer.send(message);
3.2 例子1
延迟30秒,投递10次,间隔10秒:
TextMessagemessage = session.createTextMessage("testmsg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
3.3 例子2
使用 CRON 表达式,每个小时发送一次
TextMessagemessage=session.createTextMessage("testmsg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
CRON表达式的优先级高于另外三个参数,如果在设置了CRON的同时,也有repeat和period参数, 则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。就是说设置是叠加的效果。例如 每小时都会发生消息被投递10次,延迟1秒开始,每次间隔1秒: TextMessage message=session.createTextMessage("testmsg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,1000); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,1000); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
四 文件流传输(重要)
有些时候,我们需要传递Blob(Binary Large OBjects)消息,可以按照如下方式:
配置BLOB Transfer Policy,可以在发送方的连接URI上设置,如:
"tcp://192.168.232.128:61616?jms.blobTransferPolicy.uploadUrl=http://192.168.232.128:8171/fileserver/"
Sending BlobMessages,有几种方式:
1:如果你发送到的文件或者URL存在,比如发给共享文件系统或者是Web server上的web应 用,那么你可以使用如下方式:
BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com"); producer.send(message);
2:也可以在客户端动态的创建文件流,如下:
BlobMessage message = session.createBlobMessage(newFile("/foo/bar");
InputStreamin = ...;
BlobMessage message = session.createBlobMessage(in);
Receiving BlobMessages示例:
if (message instanceofBlobMessage) {
BlobMessageblobMessage= (BlobMessage) message;
InputStreamin = blobMessage.getInputStream();
// process the stream...
}
有时候需要在JMS provider内部进行message的转换。从4.2版本起,ActiveMQ 提供了一个MessageTransformer 接口用于进行消息转换,可以在如下对象上调 用:
ActiveMQConnectionFactory
ActiveMQConnection
ActiveMQSession
ActiveMQMessageConsumer
ActiveMQMessageProducer
在消息被发送到JMS provider的消息总线前进行转换。通过producerTransform 方法
在消息到达消息总线后,但是在consumer接收到消息前进行转换。通过 consumerTransform方法
当然MessageTransformer 接口的实现,需要你自己来提供。
Consumer 高级特性
一 Client导致消息重发
1:Client用了transactions,且在session中调用了rollback()
2:Client用了transactions,且在调用commit()之前关闭
3:Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()
二 ActiveMQConnectionFactory重发
1:collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时 才生效。也就是在延迟时间上再加一个时间波动范围。默认值为0.15
2:maximumRedeliveries:最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示 不进行重传。默认值为6。
3:maximumRedeliveryDelay:最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首 次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连 时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1。
4:initialRedeliveryDelay:初始重发延迟时间,默认1000L
5:redeliveryDelay:重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L
6:useCollisionAvoidance:启用防止冲突功能,默认false
7:useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false
8:backOffMultiplier:重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生
三 consumer重发策略
在接受的Client可以如下设置:
ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory( "failover:(tcp://192.168.1.106:61679,tcp://192.168.1.106:61819)?randomize=false");
RedeliveryPolicypolicy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
cf.setRedeliveryPolicy(policy);
当消息试图被传递的次数超过配置中maximumRedeliveries属性的值时,那么,broker会认 定该消息是一个死消息,并被把该消息发送到死队列中。 默认,aciaveMQ中死队列被声明 为“ActivemMQ.DLQ”,所有不能消费的消息都被传递到该死队列中。 你可以在 acivemq.xml中配置individualDeadLetterStrategy属性,示例如下:
<policyEntry queue= "> " >
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix= "DLQ." useQueueForQueueMessages= "true" /> </deadLetterStrategy>
</policyEntry>
四 自动删除过期消息
有时需要直接删除过期的消息而不需要发送到死队列中,可以使用属性 processExpired=false来设置,示例如下:
<policyEntryqueue= "> " >
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired= "false" />
</deadLetterStrategy>
</policyEntry>
五 存放非持久消息到死队列中
默认情况下,Activemq不会把非持久的死消息发送到死队列中。非持久性如果你想 把非持久的消息发送到死队列中,需要设置属性processNonPersistent=“true”,示例如 下: <policyEntryqueue= "> " >
<deadLetterStrategy>
<sharedDeadLetterStrategyprocessNonPersistent= "true" />
</deadLetterStrategy>
</policyEntry>
六 消费端缓存
ActiveMQ通过Prefetch机制来提高性能,方式是在客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由prefetch limit来控制。当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认,确认后的消息将会从缓存中去掉。 可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置 ActiveMQPrefetchPolicy对象来配置prefetchpolicy。
也可以通过connection options或者destination options来配置。例如: tcp://localhost:61616?jms.prefetchPolicy.all=50 tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
prefetchsize的缺省值如下:
1:persistent queues (default value: 1000)
2:non-persistent queues (default value: 1000)
3:persistent topics (default value: 100)
4:non-persistent topics (default value: Short.MAX_VALUE-1
样例代码 [email protected]:wornxiao/framwork_bymyself.git