的ThreadPoolExecutor:从取消的invokeAll任务时,一个任务返回错误
问题描述:
我有一个线程池执行者做同样的操作分批未来密钥列表()。所以我使用invokeall()方法来处理批次中的键列表。该用例是这样的,如果批处理中的任何任务返回错误,则没有必要继续处理其他密钥。因此,的ThreadPoolExecutor:从取消的invokeAll任务时,一个任务返回错误
- 我该如何取消批处理执行任务,一旦任务重新调用错误。
- 但不影响另一批按键执行。即每批应该取消注销。
感谢您的帮助。
答
我看不出这可怎么没有一点定制来完成。我能想出的最简单的实现需要:
- 一个专门的未来实现基本的FutureTask子类,覆盖setException()方法以取消所有其他任务时,任务抛出一个异常
- 专门ThreadPoolExecutor实施将覆盖的invokeAll()尽量使用自定义的未来
的它是这样的:
自定义未来:
import java.util.Collection;
import java.util.concurrent.*;
public class MyFutureTask<V> extends FutureTask<V> {
private Callable<V> task;
private Collection<Future<V>> allFutures;
public MyFutureTask(Callable<V> task, Collection<Future<V>> allFutures) {
super(task);
this.task = task;
this.allFutures = allFutures;
}
@Override
protected void setException(Throwable t) {
super.setException(t);
synchronized(allFutures) {
for (Future<V> future: allFutures) {
if ((future != this) && !future.isDone()) {
future.cancel(true);
}
}
}
}
}
自定义线程池:
import java.util.*;
import java.util.concurrent.*;
public class MyThreadPool extends ThreadPoolExecutor {
public MyThreadPool(int size) {
super(size, size, 1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
List<Future<T>> futures = new ArrayList<>(tasks.size());
for (Callable<T> callable: tasks) {
futures.add(new MyFutureTask<>(callable, futures));
}
for (Future<T> future: futures) {
execute((MyFutureTask<T>) future);
}
for (Future<T> future: futures) {
try {
future.get();
} catch (ExecutionException|CancellationException e) {
// ignore this exception
}
}
return futures;
}
}
代码示例来测试它:
import java.util.*;
import java.util.concurrent.*;
public class TestThreadPool {
public static void main(final String[] args) {
ExecutorService executor = null;
try {
int size = 10;
executor = new MyThreadPool(size);
List<Callable<String>> tasks = new ArrayList<>();
int count=1;
tasks.add(new MyCallable(count++, false));
tasks.add(new MyCallable(count++, true));
List<Future<String>> futures = executor.invokeAll(tasks);
System.out.println("results:");
for (int i=0; i<futures.size(); i++) {
Future<String> f = futures.get(i);
try {
System.out.println(f.get());
} catch (CancellationException e) {
System.out.println("CancellationException for task " + (i+1) +
": " + e.getMessage());
} catch (ExecutionException e) {
System.out.println("ExecutionException for task " + (i+1) +
": " + e.getMessage());
}
}
} catch(Exception e) {
e.printStackTrace();
} finally {
if (executor != null) executor.shutdownNow();
}
}
public static class MyCallable implements Callable<String> {
private final int index;
private final boolean simulateFailure;
public MyCallable(int index, boolean simulateFailure) {
this.index = index;
this.simulateFailure = simulateFailure;
}
@Override
public String call() throws Exception {
if (simulateFailure) {
throw new Exception("task " + index + " simulated failure");
}
Thread.sleep(2000L);
return "task " + index + " succesful";
}
}
}
和执行测试的最后结果,正如显示在输出控制台:
results:
CancellationException for task 1: null
ExecutionException for task 2: java.lang.Exception: task 2 simulated failure
答
-
传递
ExecutorService
的参考每个任务波纹管:ExecutorService eServ = Executors.newFixedThreadPool(10); Set<Callable<ReaderThread>> tasks = new HashSet<Callable<ReaderThread>>(); for (int i = 0; i < 10 ; i++) { tasks.add(new ReaderThread(eServ)); } List<Future<ReaderThread>> lt = eServ.invokeAll(tasks);
-
如果任务是错误然后调用
shutdownNow()
那么将停止所有的任务public ReaderThread call() throws Exception { try { for (int i = 1; i < 50; i++) { System.out.println("i="+i+"::"+Thread.currentThread()); Thread.sleep(1000); if (i == 10 && Thread.currentThread().toString().equals("Thread[pool-1-thread-7,5,main]")) { throw new Exception(); } } } catch (Exception exc) { ex.shutdownNow(); } return this; }