Java的parallelStream()与自定义池与呼叫者工作窃取?

Java的parallelStream()与自定义池与呼叫者工作窃取?

问题描述:

正常情况下,当使用Java 8的parallelStream()时,结果是通过默认的公共fork-join池(即ForkJoinPool.commonPool())执行。Java的parallelStream()与自定义池与呼叫者工作窃取?

但是,这显然是不可取的,但是,如果有一项工作远离CPU限制,例如,可能会在很多时候等待IO。在这种情况下,人们会想要使用一个单独的池,根据其他标准来确定大小(例如,多长时间的任务可能实际使用CPU)。

有没有明显获取parallelStream()使用不同的池的方法,但有一种方法,如详细here

不幸的是,该方法需要从fork-join池线程调用并行流的终端操作。这样做的缺点是,如果目标分支连接池完全忙于现有工作,那么整个执行过程就会等待,而完全没有任何作用。因此该池可能成为比单线程执行更糟的瓶颈。相比之下,当以“普通”方式使用parallelStream()时,将使用ForkJoinPool.common.externalHelpComplete()或ForkJoinPool.common.tryExternalUnpush()来让池外的调用线程帮助处理。

有谁知道的方式来得到parallelStream()使用非默认的fork-join池有一个调用线程从的fork-join池的帮助之外在这项工作中的处理(但不是fork-join池的其余部分)?

+3

我不明白你的_这个缺点是,如果目标叉加入池完全忙于现有的work_。你不会为这个并行流调用创建一个新的池吗? –

+1

更糟。当你在任务中调用'get'而不是在公共池中时,它仍然会调用'ForkJoinPool.common.tryExternalUnpush()',但是当然不会在公共池队列中找到任务。 – Holger

+0

要回答这个问题,不,我不会为这个调用创建一个新的线程池。相反,我会在许多类似的调用*享另一个线程池,其中一些可能会重叠,其中一些可能比其他更长/更大的任务等。 –

您可以在泳池上使用awaitQuiescence来帮忙。然而,你不能选择你将要帮助的任务,它只会从池中取出下一个待处理的任务,因此,如果有更多的待处理任务,你可能会在完成你自己的任务之前结束执行。

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) // use zero timeout to execute one task only 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

将打印true

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// overload: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

将永远挂,因为它试图执行第二封堵任务。不过,只要没有无限的任务(上面的例子是极端的,只是为了演示),它会让启动线程帮助处理池的待处理任务,这将提高自己的任务执行的机会。


但要注意,在叉的全部关系/ join框架和Stream API是一个实现细节反正。

+0

我已经得出结论说我可以这样做,但正如你所指出的那样,这很可能意味着我最终将帮助完成其他任务,而不是帮助自己完成任务。这是一种非首发。 此外,我得到fork-join是一个实现细节,但需要更好地控制parallelStream(),例如,像parallelStream(forkJoinPool)一样简单。 –

+1

好吧,用'parallelStream(ForkJoinPool)'这样的方法,它不再是一个实现细节了...... – Holger

+0

不,但是没有参数的parallelStream()会做什么仍然是一个实现细节。这只会给你一个选择,在你需要的情况下进行一些控制。 –