fork-join有工作时没有工作线程吗?

问题描述:

我有一个错误,在生产中出现了两次,其中一个fork/join池停止工作,即使它有工作要做,并且正在添加更多工作。fork-join有工作时没有工作线程吗?

这是我到目前为止解释为什么要完成的任务队列已满并且任务结果流停止的结论。我有线程转储,其中我的任务生产者线程正在等待fork/join提交完成,但没有任何ForkJoinPool工作线程正在执行任何操作。

"calc-scheduling-pool-4-thread-2" #65 prio=5 os_prio=0 tid=0x00000000102e39f0 nid=0x794a in Object.wait() [0x00002ad900a06000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334) 
    - locked <0x000000061ad08708> (a com.....Engine$Calculation) 
    at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:391) 
    at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719) 
    at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:2613) 
    at com...Engine.calculateSinceLastBatch(Engine.java:141) 

不管我在做什么,这不应该发生吗?线程转储是在检测到初始条件后的几个小时。在运行时我有两个其他的ForkJoinPools,它们都运行正常,并且存在很多工作线程。

这个池的并行性是1(我知道这很愚蠢,但不应该破坏fork/join池的正确性)。除非我的任务队列填满并且线程转储显示没有工作者,否则不会检测到其他错误或异常。

有没有其他人看到过这个?要么我错过了某些东西,或者在fork/join中有一个从未(重新)为我启动工作线程的错误。

运行时是用java 8

更新代码

这是我们如何使用叉子/加入合理的生产简化。我们有三个引擎,只有其中一个是配置了1

import java.util.*; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.stream.*; 

public class Engine { 

    BlockingQueue<Calculation> externalQueue = new LinkedBlockingQueue<>(100000); 
    ScheduledExecutorService scheduling = Executors.newScheduledThreadPool(3); 
    static ForkJoinPool forkJoin = new ForkJoinPool(1); 

    public static void main(String[] args) { 
     new Engine().start(); 
    } 

    void start() { 
     final AtomicInteger batch = new AtomicInteger(0); 
     // data comes in from external systems 
     scheduling.scheduleWithFixedDelay(
       () -> produceData(batch.getAndIncrement()), 
       500, 
       500, 
       TimeUnit.MILLISECONDS); 
     // internal scheduling processes data with a fixed delay 
     scheduling.scheduleWithFixedDelay(
       this::calculate, 
       1000, 
       1000, 
       TimeUnit.MILLISECONDS); 
    } 

    void produceData(final int batch) { 
     System.out.println(Thread.currentThread().getName() + " => submitting data for batch " + batch); 
     Stream<Integer> data = IntStream.range(0, 10).boxed(); 
     data.map((i) -> new Calculation(batch, i)).forEach(externalQueue::offer); 
    } 

    void calculate() { 
     int available = externalQueue.size(); 
     List<Calculation> tasks = new ArrayList<>(available); 
     externalQueue.drainTo(tasks); 
     // invoke will block for the results to be calculated before continuing 
     forkJoin.invoke(new CalculationTask(tasks, 0, tasks.size())); 
     System.out.println("done with calculations at " + new Date()); 
    } 

    static class CalculationTask extends RecursiveAction { 

     static int MIN_CALCULATION_THRESHOLD = 3; 

     List<Calculation> tasks; 
     int start; 
     int end; 

     CalculationTask(List<Calculation> tasks, int start, int end) { 
      this.tasks = tasks; 
      this.start = start; 
      this.end = end; 
     } 

     // if below a threshold, calculate here, else fork to new CalculationTasks 
     @Override 
     protected void compute() { 
      int work = end - start; 
      if (work <= threshold()) { 
       for (int i = start; i < end; i++) { 
        Calculation calc = tasks.get(i); 
        calc.calculate(); 
       } 
       return; 
      } 

      invokeNewActions(); 
     } 

     int threshold() { 
      return Math.max(tasks.size()/forkJoin.getParallelism()/2, MIN_CALCULATION_THRESHOLD); 
     } 

     void invokeNewActions() { 
      invokeAll(
        new CalculationTask(tasks, start, middle()), 
        new CalculationTask(tasks, middle(), end)); 
     } 

     int middle() { 
      return (start + end)/2; 
     } 
    } 

    static class Calculation { 

     int batch; 
     int data; 

     Calculation(int batch, int data) { 
      this.batch = batch; 
      this.data = data; 
     } 

     void calculate() { 
      // does some work and pushes results to a listener 
      System.out.println(Thread.currentThread().getName() + " => calculation complete on batch " + batch 
          + " for " + data); 
     } 
    } 

} 
+0

什么是队列?什么版本,Java7或8?一些代码也可能有帮助。 – edharned

+0

对不起,可能还不清楚。我有大量的任务进入队列,每隔一段时间排空并给fork/join分解并执行。 生产环境是Java 8.我可以尝试提供一些代码,但我怀疑当它归结为我们如何使用fork/join时,它将看起来像是其中一个教程。 –

+2

你试过设置大于1的并行吗?我知道这不是你正在寻找的答案,但是如果没有重现的测试,就不可能有人会有一个可靠的答案。你的假设是正确的,过去有过奇怪的事情。 –

等待是java.util.concurrent.ForkJoinTask.externalAwaitDone并行(ForkJoinTask.java:334)

这告诉我F/J可能会将您的提交线程用作工作者。 按照来自invokeAll的代码。任务提交执行后,代码需要Future,并且以 ((ForkJoinTask)futures.get(i))结束。quietlyJoin();静静地加入 加入。

在那里,如果(Thread.currentThread())instanceofForkJoinWorkerThread)如果池使用您的提交线程作为工作者,它将不会是true,它将在externalAwaitDone()结束。

问题可能是您的提交线程永远不会醒来,因为它不是真正的工作者。使用提交线程作为工作者有很多问题,这可能是另一个问题。

As @ John-Vint说,没有测试,这个答案只是一个猜测。为什么不把并行度设置为> 1并且完成它。