如何将DeferredResult与ListenableFuture列表结合使用?
问题描述:
我有一个番石榴ListenableFuture
实例和Spring DeferredResult
的列表。我想为列表中第一个成功的未来设定结果,或者如果超时还没有到期,还没有从所有期货中获得成功的结果。这里是我的尝试:如何将DeferredResult与ListenableFuture列表结合使用?
DeferredResult<String> foo() {
DeferredResult<String> result = new DeferredResult<>(3000L);
List<String> resps = newArrayList();
List<ListenableFuture<String>> fList = ...
fList.forEach(f -> Futures.addCallback(f, new FutureCallback<String>() {
@Override
public void onSuccess(String resp) {
resps.add(resp);
}
@Override
public void onFailure(Throwable t) {
// NOP
}
}));
ListenableFuture<List<String>> f0 = Futures.successfulAsList(fList);
Futures.addCallback(f0, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> r) {
if (!result.hasResult()) {
result.
if (r != null) {
result.setResult(...);
}
}
}
@Override
public void onFailure(Throwable t) {
// NOP
}
});
return result;
}
我的代码不能正常工作,因为它等待来自所有期货的结果,但我需要相对于DeferredResult超时最快的结果。我该如何解决它?
答
DeferredResult<String> foo() {
DeferredResult<String> result = new DeferredResult<>(3000L);
List<ListenableFuture<String>> fList = ...
ExecutorService serializingExecutor = Executors.newSingleThreadExecutor();
fList.forEach(
f ->
Futures.addCallback(
f,
new FutureCallback<String>() {
@Override
public void onSuccess(String value) {
if (!result.hasResult()) {
result.setResult(value);
}
}
@Override
public void onFailure(Throwable throwable) {}
},
// To avoid race in the callback:
serializingExecutor));
return result;
}
答
这确实返回包装对象与期货的结果管理超时并指示是否全部完成(allFinished
属性)之前完成的列表。
在您的处理代码中,您可以检查是否全部完成,并返回整个列表或只是第一个项目(或者没有,如果没有未来完成)。
DeferredResult<ResultWrapper> foo() {
DeferredResult<ResultWrapper> result = new DeferredResult<>(3000L);
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3));
List<ListenableFuture<String>> fList = IntStream.range(1, 1000)
.mapToObj(i -> executor.submit(() -> String.valueOf(i))).collect(Collectors.toList());
List<String> resps = Lists.newArrayListWithCapacity(fList.size());
Object lock = new Object();
fList.forEach(f -> Futures.addCallback(f, new FutureCallback<String>() {
@Override
public void onSuccess(String resp) {
synchronized (lock) {
resps.add(resp);
result.setResult(new ResultWrapper(
resps.size() == fList.size(),
ImmutableList.copyOf(resps)));
}
}
@Override
public void onFailure(Throwable t) {
// NOP
}
}));
return result;
}
private static class ResultWrapper {
private boolean allFinished;
private List<String> list;
public ResultWrapper(boolean allFinished, List<String> list) {
this.allFinished = allFinished;
this.list = list;
}
public boolean isAllFinished() {
return allFinished;
}
public List<String> getList() {
return list;
}
}
请解释一下这是干什么的。你是如何实现的?我想为列表中的第一个成功的未来设定结果,或者如果超时还没有到期,还没有从所有期货中获得成功的结果?对'hasResult'的调用是不必要的。如果结果已经设置,'setResult'返回'false'。 –