Spring集成:从JMS读取 - >处理消息 - >坚持在DB

问题描述:

我使用Spring Integration从JMS读取消息,处理它们,然后使用我自己的dbPersistor的persist方法将其持久化到数据库,该方法返回void类型。我编写了一个测试用例来验证发布到JMS的消息是否成功保存在数据库中。我的SI和JMS配置都作为本次测试如下 - 在测试中,我做了以下Spring集成:从JMS读取 - >处理消息 - >坚持在DB

<int:poller fixed-delay="500" default="true"/> 

<int:channel id="inputChannel"> 
    <int:queue/> 
</int:channel> 
<int:channel id="errorChannel"> 
    <int:queue/> 
</int:channel> 
<jms:message-driven-channel-adapter id="jmsInboudAdapter"   
    connection-factory="connectionFactory" destination-name="MessageQueue" 
    channel="inputChannel" error-channel="errorChannel" transaction-manager="dbTxManager"          
    acknowledge="transacted"/> 

<int:chain id="handlerChain" input-channel="inputChannel"> 
    <int:service-activator ref="jmsMessageHandler" method="handleMessage" /> 
    <int:service-activator ref="dbPersistor" method="persist" /> 
</int:chain> 

然后 -

  • jmsTemplate.send()
  • verifyMessageWasPersistedToDB

这当我只向数据库发布一条消息时效果很好。但是,当我通过jmsTemplate.send()循环发布多条消息时,主线程完成操作,同时SI线程仍在执行,并尝试验证数据库中的消息,并失败,因为某些消息尚未保存。我的问题是 -

  1. 如何使主线程等待SI线程完成,然后调用验证方法?
  2. 如果出现db异常和回滚,我如何验证失败的消息是否回到原始队列中?

由于 AJ

  1. inputChannel不应该是一个队列通道 - 时被插入到队列中的消息中的JMS事务将提交 - 数据库交易不会在的范围来执行JMS事务。你必须使用直接渠道(删除轮询和<queue/>)。见the documentation on transactions in Spring Integration

  2. 您将不得不查询结果的数据库;你可能会添加一个拦截器和一些CountDownLatch,但只需要轮询数据库,直到结果出现或者一段时间结束为止。

+0

谢谢Gary,将'inputChannel'改为'DirectChannel'帮助我解决了事务管理问题。现在失败的消息停留在后端消息存储中。 但我的第一个问题仍然存在 - 我如何让主线程在继续执行其余代码之前等待SI链完成执行?目前,我在调用主线程中的其余代码之前使用了一个'Thread.sleep()'。 –

+0

正如我在#2中所说的,您不能“轻松”地让测试线程等待 - 您必须将其他组件添加到流程中。直到预期结果显示(或者某个时间限制到期,表示测试失败),继续轮询数据库会更容易。 –

+0

我明白你的观点。我将采用在流中添加CountDownLatch的方法。感谢所有的答复和解释加里! –