WorkerThread:等待处理完成(BlockingQueue)
问题描述:
即时构建一个多线程应用程序,使用WorkerThreads处理来自BlockingQueues的任务。 worker看起来像follws(作为抽象类,子类实现processItem())。WorkerThread:等待处理完成(BlockingQueue)
abstract class WorkerThread extends Thread {
BlockingQueue<Task> q;
int tasksInSystem; // globally available
public void run() {
while(!interrupted()) {
Task t = q.take();
process(t);
tasksInSystem--;
}
}
abstract void process(Task t);
}
特别的是,我想等待所有任务完成。 我的第一个想法是:
- 数每增加一个任务
- 降低用计数器处理完成后。
但是: 但也有不同的任务,不同的工人实现和多队列。所以我将不得不维护大量不同的柜台。
我想什么有:
q.waitForEmptyAndCompleted()
这需要排队跟踪“飞行”的任务,并要求工作进程,当他们完成信号(而不是tasksInsystem---;
)。
工作人员无法增加该计数器,因为他必须在将他们从队列中取出之后对其进行计数。但是另一个线程可能会在take()调用之后立即运行,以至于工作人员无法事先增加计数器。
因此,计数器增加和take()必须连在一起(atomar)。这导致我到一个专门的BlockingQueue。
我没有找到预先制定的解决方案。所以我最好的猜测是实现我自己的BlockingQueue。是否有我可以使用的东西(为了避免自己实现并测试线程安全的阻塞队列)?或者你有什么想法以不同的方式实施这种等待电话?
答
好的,因为一般ExecutorService
是不够的,或许ForkJoinPool
将工作,它不显式公开队列,但应该是非常容易使用给你所描述的。
关键方法是awaitQuiescence(long timeout, TimeUnit unit)
它将等待,直到所有提交的任务已完成执行。
你可以使用'ExecutorService'并调用shutdown吗? Executor服务将管理所有任务,您需要将队列与它关联。关机将阻止,直到所有任务完成。 – hgrey
嘿,谢谢你的回答。我接近实际使用ExecutorService。但是当我调用shutdown()时,整个服务停止。我实际上想重用队列。 waitForEmptyAndComplete()后发生的事情将触发新的任务。所以我必须为每一轮设置一个新的ExecutorService。至少,它不会强迫我实现我自己的Queue-Variant。 – markus