java:组合多线程/单线程任务队列

问题描述:

我喜欢ExecutorService系列的类/接口。我不必担心线程;我收集一个ExecutorService实例并用它来安排任务,如果我想要使用8线程或16线程池,那么非常好,我根本不必担心这个问题,它只是依赖于如何设置ExecutorService。欢呼!java:组合多线程/单线程任务队列

但是,如果我的一些任务需要按连续顺序执行,我该怎么办?理想情况下,我会要求ExecutorService让我在单个线程上安排这些任务,但似乎没有任何方法这样做。

编辑:的任务不会提前知道的,他们是一个无限系列是不稳定的各种事件(认为随机/未知的到来过程中产生的任务:如盖革计数器的点击,或击键事件)。

+0

出于单线程事件的目的,我有一个单独的单线程执行程序。这意味着我有可能再使用一条线索,但我不相信这种影响是如此之高。 – 2010-01-28 21:01:40

+0

可能的重复:http://*.com/questions/2153663/controlling-task-execution-order-with-executorservice – finnw 2010-01-28 22:50:00

+0

@finnw,你有一个点。不太确定它是否相同(我试图限制自己使用一个可能是多线程的ExecutorService)。 – 2010-01-28 23:06:15

你可以编写一个Runnable的实现,它接受一些任务并连续执行它们。

喜欢的东西:

public class SerialRunner implements Runnable { 
    private List<Runnable> tasks; 

    public SerialRunner(List<Runnable> tasks) { 
     this.tasks = tasks; 
    } 

    public void run() { 
     for (Runnable task: tasks) { 
      task.run(); 
     } 
    } 
} 
+0

这不起作用,这些任务是一系列无限制的任务,一次排队。 – 2010-01-28 19:45:44

+0

在这种情况下,如果它们的顺序对于执行的其他任务并不重要,请使用'Executors.newSingleThreadExecutor()' – danben 2010-01-28 19:52:19

+0

+1:这个答案不是我想要的,但它给了我一个想法...... – 2010-01-28 20:01:27

我使用的是Executors.newSingleThreadExecutor()的,我要排队,只运行一次一个任务,创建一个独立的执行者。 另一种方法是刚刚组成几个任务,并提交一个,

executor.submit(new Runnable() { 
    public void run() { 
     myTask1.call(); 
     myTask2.call(); 
     myTask3.call(); 
    }}); 

虽然你可能需要更复杂的,如果仍想myTask2运行即使myTask1抛出异常。

+0

使用一个单独的执行程序可以工作,但如果可能的话,这是一个我想避免的限制。 – 2010-01-28 19:45:02

我这样做的方式是通过一些本地代码,根据任务所说的关键是什么(这可以是完全任意的或有意义的值)将流式传输到不同的线程上。除了提供给Queue并让其他线程关闭它(或者在您的案例中使用ExecutorService进行工作并使该服务维护一个线程池以启动内部工作队列),您可以提供Pipelineable(又名一个任务)到PipelineManager,该任务为该任务的密钥找到正确的队列并将任务粘贴到该队列中。还有其他一些代码管理线程,这些代码用于管理排队队列,以确保始终有1个线程取下队列,以确保为同一个键提供的所有工作将以串行方式执行。

使用这种方法,您可以轻松地为n组串行工作留出某些密钥,同时对剩余的密钥进行循环移动以便按照任何旧顺序进行工作,或者可以通过审慎保留某些管道(线程)关键选择。

这种做法是不是对JDK ExecutorService实施可行的,因为它们是由一个单一的BlockingQueue(至少ThreadPoolExecutor的)支持,因此没有办法说“做这个工作在任何旧秩序,但这项工作必须序列化”。我假设你想要这样做以保持吞吐量,否则按照Danben的评论将所有内容粘贴到singleThreadExecutor

(编辑)

你能做什么,而不是,要保持相同的抽象,是创建创建自己的实现的ExecutorService与会代表的多个实例ThreadPoolExecutor(或相似)你需要的; 1支持n个线程和1个或多个单线程实例。类似下面(这绝不是在所有的工作代码,但希望你的想法!)

public class PipeliningExecutorService<T extends Pipelineable> implements ExecutorService { 
    private Map<Key, ExecutorService> executors; 
    private ExecutorService generalPurposeExecutor; 

    // ExecutorService methods here, for example 
    @Override 
    public <T> Future<T> submit(Callable<T> task) { 
     Pipelineable pipelineableTask = convertTaskToPipelineable(task); 
     Key taskKey = pipelineable.getKey(); 
     ExecutorService delegatedService = executors.get(taskKey); 
     if (delegatedService == null) delegatedService = generalPurposeExecutor; 
     return delegatedService.submit(task); 
    } 
} 

public interface Pipelineable<K,V> { 
    K getKey(); 
    V getValue(); 
} 

这是非常丑陋的,为了这个目的,该ExecutorService方法是通用的,而不是服务本身,这意味着你需要一些标准的方式来编组传递给Pipelineable和后备的任何传递,如果你不能(例如把它扔到通用池中)。

嗯,我想到了一些东西,不太确定这是否可行,但也许它会(未经测试的代码)。这略过了微妙之处(异常处理,取消,对底层执行者的其他任务的公平性等),但也许是有用的。

class SequentialExecutorWrapper implements Runnable 
{ 
    final private ExecutorService executor; 

    // queue of tasks to execute in sequence 
    final private Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); 

    // semaphore for pop() access to the task list 
    final private AtomicBoolean taskInProcess = new AtomicBoolean(false); 

    public void submit(Runnable task) 
    { 
     // add task to the queue, try to run it now 
     taskQueue.offer(task); 
     if (!tryToRunNow()) 
     { 
      // this object is running tasks on another thread 
      // do we need to try again or will the currently-running thread 
      // handle it? (depends on ordering between taskQueue.offer() 
      // and the tryToRunNow(), not sure if there is a problem) 
     } 
    } 

    public void run() 
    { 
     tryToRunNow(); 
    } 

    private boolean tryToRunNow() 
    { 
     if (taskInProcess.compareAndSet(false, true)) 
     { 
      // yay! I own the task queue! 
      try { 
       Runnable task = taskQueue.poll(); 
       while (task != null) 
       { 
        task.run(); 
        task = taskQueue.poll(); 
       } 
      } 
      finally 
      { 
       taskInProcess.set(false); 
      } 
      return true; 
     } 
     else 
     { 
      return false; 
     } 
    }