如何知道一组RabbitMQ任务何时完成?
我正在使用RabbitMQ让工作进程编码视频文件。我想知道所有文件何时完成 - 即所有工作进程何时完成。如何知道一组RabbitMQ任务何时完成?
我能想到的唯一方法就是使用数据库。当视频编码完成:
UPDATE videos SET status = 'complete' WHERE filename = 'foo.wmv'
-- etc etc etc as each worker finishes --
然后检查是否所有的影片已编码:
SELECT count(*) FROM videos WHERE status != 'complete'
但是如果我要做到这一点,那么我觉得我我失去了RabbitMQ作为多个分布式工作进程机制的好处,因为我仍然需要手动维护一个数据库队列。
有没有RabbitMQ依赖关系的标准机制?也就是说,“等待这5项任务完成,一旦完成,然后开始一项新任务?”
我不希望有一个父进程将这些任务添加到队列,然后“等待”每个人返回一个“完成”的状态。然后,我必须为每组视频保留一个单独的流程,此时与独立的ThreadPool概念相比,我已经失去了解耦的工作流程的优势。
我在问一些不可能的事吗?或者,是否有标准的广泛采用的解决方案来管理我错过的队列中的任务总体状态?
编辑:搜索之后,我发现这个类似的问题:Getting result of a long running task with RabbitMQ
有什么特别的想法,人们对这个?
使用“响应”队列。我不知道RabbitMQ的任何细节,所以这是一般:
- 有你的父进程发送请求和跟踪它发送多少
- 让父进程也等待一个特定的响应队列(孩子们知道)
- 每当孩子完成的东西(或不能完成出于某种原因),将消息发送到响应队列
- 每当
numSent == numResponded
,大功告成
需要记住的是超时 - 如果子进程死亡会发生什么?你必须做更多的工作,但基本上:
- 随着每个发送的消息,包括某种类型的ID,并将该ID和当前时间添加到散列表。
- 对于每一个反应,从哈希表中删除ID
- 定期走哈希表,并删除任何已超时
这就是所谓的Request Reply Pattern。
我已经实现了一个工作流程状态机作为一系列队列实现的工作流程。工作人员在一个队列上接收消息,处理该工作,然后将相同的消息发布到另一个队列中。然后另一种类型的工作进程接收该消息等。
就你而言,听起来像你需要实现Enterprise Integration Patterns(这是一本免费的在线书籍)中的一种模式,并且有一个简单的工作人员收集消息直到完成一组工作,然后将单个消息处理为代表工作流中下一步的队列。
赞成引用模式名称。 –