Java异步处理

问题描述:

我目前正在开发一个使用异步处理分配的系统。信息传输是使用队列完成的。所以一个进程会将信息放入Queue(并终止),另一个进程会将其提交并处理。我的实现使我面临许多挑战,我对每个人的方法对这些问题感兴趣(在架构和库方面)。Java异步处理

让我画这幅画。比方说你有三个过程:

Process A -----> Process B 
         | 
Process C <-----------| 

所以进程A把一个消息队列和结束,进程B拾取消息,处理它,并把它放在一个“回”队列。 进程C接收消息并对其进行处理。

  1. 如何一个手柄方法B不听或处理消息从队列?是否有一些JMS类型的方法阻止生产者在消费者不活跃时提交消息?所以进程A将提交但抛出异常。
  2. 比方说工艺C必须获得在X分钟内答复,但进程B已停止(因任何原因),有没有一些强制执行的队列超时机制?所以保证在X分钟内回复,将启动进程C

所有这些事情都可以使用死信来处理某种排队吗?我应该用定时器手动完成这一切,并检查。我提到过JMS,但我对任何事情都很开放,实际上我使用Hazelcast作为队列。

请注意,这是更多的一个架构问题,在可用的Java技术和方法方面,我确实认为这是一个适当的问题。

任何建议将不胜感激。

感谢

+0

你看过阿卡吗?演员听起来像是你的情况的理想解决方案。尽管akka在scala中很流行,但它也适用于java。 – Albert 2012-02-03 08:19:49

+0

我会看看阿卡。感谢大家的解决方案。 – Paul 2012-02-03 08:38:52

恕我直言,最简单的解决方案是使用ExecutorService或基于执行程序服务的解决方案。这支持一个工作队列,计划任务(用于超时)。

它也可以在一个单独的过程中工作。 (我相信Hazelcast支持分布式ExecutorService)

在我看来,那你要问的问题的类型是“闻香”是队列和异步处理可能不适合您的情况最好的工具。

1)失败了排队的目的。听起来就像你需要一个同步的请求 - 响应过程。

2)进程C一般不会得到回复。它从队列中获取消息。如果队列中有消息,并且进程C已准备就绪,那么它会得到它。例如,流程C可以决定消息一旦得到就失效了。

+0

我看到你在说什么,我想你可能是对的。谢谢 – Paul 2012-02-03 06:59:53

那么,排队的一点就是让事情保持独立。

如果你没有被困在任何特定的技术上,你可以为你的队列使用数据库。

但首先,确保两个进程协调的简单机制是使用套接字。如果可行的话,简单地让进程B在一些熟知的端口上创建一个开放的套接字监听器,然后进程A将连接到该套接字并监视它。如果进程B永远消失了,进程A可以告诉,因为他们的插座得到关机,并且可以使用,作为对使用过程B.

问题对于B警报 - > C题,有一个数据库表:

create table queue (
    id integer, 
    payload varchar(100), // or whatever you can use to indicate a payload 
    status varchar(1), 
    updated timestamp 
) 

然后,进程A将其条目放入队列中,当前时间和状态为“B”。 B,听队列:

select * from queue where status = 'B' order by updated 

当B完成时,它更新队列以将状态设置为“C”。

同时,“C”是轮询分贝:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 

(含但是阈值是多久,你想要的东西腐烂队列)。

最后,C将队列行更新为'D'完成,或删除它,或者任何你喜欢的。

黑暗的一面是在这里有一些竞争条件,C可能会试图抓住一个条目,而B刚刚启动。你可以通过严格的隔离级别和锁定来完成。简直如下:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 
FOR UPDATE 

也使用FOR UPDATE进行B的选择。通过这种方式,赢得选择比赛的人将在该排上获得排他性锁定。

这会让你在实际功能方面走得很远。

您正期待异步(消息)设置的同步处理的语义是不可能的。我曾参与WebSphere MQ,通常当消费者死亡时,消息永远保留在队列中(除非设置过期)。一旦队列达到其深度,后续消息将被移至死信队列。

我已经使用类似的方法来创建视频代码转换作业的排队和处理系统。基本上,它的工作方式是:

  1. Process A职位“日程安排”消息Arbiter Q,它增加了工作到它的“等待”队列。
  2. Process B请求从Arbiter Q请求下一个作业,该作业将删除其“等待”队列中的下一个项目(受一些自定义调度逻辑的限制,以确保单个用户无法泛滥转码请求并阻止其他用户转码视频),并在将作业返回到Process B之前插入其“处理”集。当它进入“处理”集合时,作业被加时间戳。
  3. Process B完成作业并发送一条“完整”消息至Arbiter Q,该消息将作业从“处理”集合中删除,然后修改某个状态,以便Process C知道作业已完成。
  4. Arbiter Q定期检查其“处理”集中的作业,并超时运行异常长时间的任何作业。如果需要,Process A然后可以*尝试重新排队相同的作业。

这是使用JMX实现的(JMS本来更合适,但我离题了)。Process A只是响应用户启动的转码请求的servlet线程。 Arbiter Q是一个MBean单例(在服务器集群中的所有节点上持久/复制),它们收到“时间表”和“完整”消息。其内部管理的“队列”只是List实例,当作业完成时,它会修改应用程序数据库中的值以引用转码后的视频文件的URL。 Process B是转码线程。它的工作仅仅是要求工作,对其进行转码,然后在完成时进行报告。一遍又一遍,直到时间结束。 Process C是另一个用户/ servlet线程。它会看到该URL可用,并向用户呈现下载链接。

在这种情况下,如果Process B已经死亡,那么工作将永远坐在“等待”队列中。然而,实际上,这从未发生过。如果您的Process B没有运行/正在执行它应该做的事情,那么我认为这会在您的部署/配置/实施Process B中提出一个问题,而不是您整体方法中的问题。

我想你的第一个问题已经被其他海报充分回答了。

在第二个问题上,根据应用程序使用的消息传递引擎,您尝试执行的操作可能是可能的。我知道这适用于IBM MQ。我已经看到使用WebSphere MQ Classes for Java而不是JMS来完成此操作。它的工作方式是当进程A在队列中放入消息时,它指定了等待响应消息的时间。如果进程A未能在指定的时间内收到响应消息,则系统会引发相应的异常。

我不认为在JMS中有一种标准的方式来处理请求/响应超时,因此您可能必须使用特定于平台的类,例如WebSphere MQ Classes for Java。