如何将DeferredResult与ListenableFuture列表结合使用?

问题描述:

我有一个番石榴ListenableFuture实例和Spring DeferredResult的列表。我想为列表中第一个成功的未来设定结果,或者如果超时还没有到期,还没有从所有期货中获得成功的结果。这里是我的尝试:如何将DeferredResult与ListenableFuture列表结合使用?

DeferredResult<String> foo() { 
    DeferredResult<String> result = new DeferredResult<>(3000L); 

    List<String> resps = newArrayList(); 
    List<ListenableFuture<String>> fList = ... 

    fList.forEach(f -> Futures.addCallback(f, new FutureCallback<String>() { 
     @Override 
     public void onSuccess(String resp) { 
      resps.add(resp); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      // NOP 
     } 
    })); 

    ListenableFuture<List<String>> f0 = Futures.successfulAsList(fList); 

    Futures.addCallback(f0, new FutureCallback<List<String>>() { 
     @Override 
     public void onSuccess(List<String> r) { 
      if (!result.hasResult()) { 
       result. 
        if (r != null) { 
         result.setResult(...); 
        } 
       } 
      } 

      @Override 
      public void onFailure(Throwable t) { 
       // NOP 
      } 
     }); 

    return result; 
} 

我的代码不能正常工作,因为它等待来自所有期货的结果,但我需要相对于DeferredResult超时最快的结果。我该如何解决它?

DeferredResult<String> foo() { 
    DeferredResult<String> result = new DeferredResult<>(3000L); 

    List<ListenableFuture<String>> fList = ... 

    ExecutorService serializingExecutor = Executors.newSingleThreadExecutor(); 
    fList.forEach(
     f -> 
      Futures.addCallback(
       f, 
       new FutureCallback<String>() { 
       @Override 
       public void onSuccess(String value) { 
        if (!result.hasResult()) { 
        result.setResult(value); 
        } 
       } 

       @Override 
       public void onFailure(Throwable throwable) {} 
       }, 
       // To avoid race in the callback: 
       serializingExecutor)); 

    return result; 
} 
+0

请解释一下这是干什么的。你是如何实现的?我想为列表中的第一个成功的未来设定结果,或者如果超时还没有到期,还没有从所有期货中获得成功的结果?对'hasResult'的调用是不必要的。如果结果已经设置,'setResult'返回'false'。 –

这确实返回包装对象与期货的结果管理超时并指示是否全部完成(allFinished属性)之前完成的列表。

在您的处理代码中,您可以检查是否全部完成,并返回整个列表或只是第一个项目(或者没有,如果没有未来完成)。

DeferredResult<ResultWrapper> foo() { 
     DeferredResult<ResultWrapper> result = new DeferredResult<>(3000L); 

     ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3)); 
     List<ListenableFuture<String>> fList = IntStream.range(1, 1000) 
       .mapToObj(i -> executor.submit(() -> String.valueOf(i))).collect(Collectors.toList()); 

     List<String> resps = Lists.newArrayListWithCapacity(fList.size()); 
     Object lock = new Object(); 

     fList.forEach(f -> Futures.addCallback(f, new FutureCallback<String>() { 
      @Override 
      public void onSuccess(String resp) { 
       synchronized (lock) { 
        resps.add(resp); 
        result.setResult(new ResultWrapper(
          resps.size() == fList.size(), 
          ImmutableList.copyOf(resps))); 
       } 

      } 

      @Override 
      public void onFailure(Throwable t) { 
       // NOP 
      } 
     })); 

     return result; 
    } 

    private static class ResultWrapper { 

     private boolean allFinished; 
     private List<String> list; 

     public ResultWrapper(boolean allFinished, List<String> list) { 
      this.allFinished = allFinished; 
      this.list = list; 
     } 

     public boolean isAllFinished() { 
      return allFinished; 
     } 

     public List<String> getList() { 
      return list; 
     } 
    }