如何将1 completablefuture划分为流中的许多Completablefuture?
例如,我有这样的方法:如何将1 completablefuture划分为流中的许多Completablefuture?
public CompletableFuture<Page> getPage(int i) {
...
}
public CompletableFuture<Document> getDocument(int i) {
...
}
public CompletableFuture<Void> parseLinks(Document doc) {
...
}
而且我的流程:
List<CompletableFuture> list = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i))
// I want method like this:
.thenApplyAndSplit(CompletableFuture<Page> page -> {
List<CompletableFuture<Document>> docs = page.getDocsId()
.stream()
.map(i -> getDocument(i))
.collect(Collectors.toList());
return docs;
})
.map(CompletableFuture<Document> future -> {
return future.thenApply(Document doc -> parseLink(doc);
})
.collect(Collectors.toList());
它应该像flatMap()
为CompletableFuture
,所以我想实现这个流程:
List<Integer> -> Stream<CompletableFuture<Page>>
-> Stream<CompletableFuture<Document>>
-> parse each
UPDATE
Stream<CompletableFuture<Page>> pagesCFS = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i));
Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> {
// How to return stream of Document when page finishes?
// page.thenApply(...)
})
你真的不得不使用Streams吗?难道你不能只对你的CompletableFutures
进行一些相关的操作吗?特别是自上一次调用返回CompletableFutures<Void>
(当然,也有可能使用Collection.forEach
)
List<CompletableFuture<Page>> completableFutures = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i)).collect(Collectors.toList());
for (CompletableFuture<Page> page : completableFutures) {
page.thenAccept(p -> {
List<Integer> docsId = p.getDocsId();
for (Integer integer : docsId) {
getDocument(integer).thenAccept(d-> parseLinks(d));
}
});
}
编辑:嗯,所以我作了一次尝试,但我不知道这是否是一个好主意,因为我我不是CompletableFuture
的专家。
使用下面的方法(也许有可能是一个更好的实现):
public static <T> CompletableFuture<Stream<T>> flatMapCF(Stream<CompletableFuture<T>> stream){
return CompletableFuture.supplyAsync(()->
stream.map(CompletableFuture::join)
);
}
Stream<CompletableFuture<Page>> pagesCFS = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i));
CompletableFuture<Stream<Page>> pageCF = flatMapCF(pagesCFS);
CompletableFuture<Stream<Document>> docCF=
pageCF.thenCompose(a ->
flatMapCF(a.flatMap(
b -> b.getDocsId()
.stream()
.map(c -> getDocument(c))
)));
这个问题可能是,这是CompletableFuture
回报,只有当所有的结果都可以
的问题,我不要不知道有多少文档页面。未来完成后我可以知道这个 – mystdeim
@mystdeim:但是应该在完成它的'CompletableFuture
没关系,但我想在最后得到一个Document流。也许这对于jdk 8是不可能的,我应该使用RxJava,因为flatMap运算符在那里存在 – mystdeim
如果你不关心当操作完成,那么下面将简单地在所有文件上触发parseLinks()
:
IntStream.range(0, 10)
.mapToObj(this::getPage)
.forEach(pcf -> pcf
.thenAccept(page -> page
.getDocsId()
.stream()
.map(this::getDocument)
.forEach(docCF -> docCF.thenCompose(this::parseLinks))));
行吟rwise,因为你上次的操作返回CompletableFuture<Void>
,我假设你主要有兴趣知道什么时候一切都完成了。你可以做这样的事情:
CompletableFuture<Void> result = CompletableFuture.allOf(IntStream.range(0, 10)
.mapToObj(this::getPage)
.map(pcf -> pcf
.thenCompose(page -> CompletableFuture.allOf(page
.getDocsId()
.stream()
.map(docId -> getDocument(docId)
.thenCompose(this::parseLinks))
.toArray(CompletableFuture[]::new))))
.toArray(CompletableFuture[]::new));
如果你有兴趣的个人CompletableFuture
S的结果,最好的可能是直接在流中处理它们,在创建它们的地方。
你甚至可以用可重用的方法来包装这一切。例如,如果parseLinks()
被返回CompletableFuture<List<String>>
,你可以定义一个方法是这样的:
public CompletableFuture<Void> processLinks(Function<? super CompletableFuture<List<String>>, ? extends CompletableFuture<?>> processor) {
return CompletableFuture.allOf(IntStream.range(0, 10)
.mapToObj(this::getPage)
.map(pcf -> pcf
.thenCompose(page -> CompletableFuture.allOf(page
.getDocsId()
.stream()
.map(docId -> getDocument(docId)
.thenCompose(this::parseLinks))
.map(processor) // here we apply the received function
.toArray(CompletableFuture[]::new))))
.toArray(CompletableFuture[]::new));
}
,并处理由此产生的名单是这样的:
processLinks(linksCF -> linksCF
.thenAccept(links -> links.forEach(System.out::println)));
返回CompletableFuture
将完成一次所有链接已打印。
我也想为CompletableFutures
的码流实施Spliterator
拍一下,所以这里是我的尝试。
需要注意的是,如果你是在平行模式使用此,要注意使用不同的ForkJoinPool
为流和本CompletableFuture
的背后运行的任务。该流将等待未来完成,因此,如果共享相同的执行程序,甚至发生死锁,您实际上可能会失去性能。
因此,这里是实现:
public static <T> Stream<T> flattenStreamOfFutures(Stream<CompletableFuture<? extends T>> stream, boolean parallel) {
return StreamSupport.stream(new CompletableFutureSpliterator<T>(stream), parallel);
}
public static <T> Stream<T> flattenStreamOfFuturesOfStream(Stream<CompletableFuture<? extends Stream<T>>> stream,
boolean parallel) {
return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity());
}
public static class CompletableFutureSpliterator<T> implements Spliterator<T> {
private List<CompletableFuture<? extends T>> futures;
CompletableFutureSpliterator(Stream<CompletableFuture<? extends T>> stream) {
futures = stream.collect(Collectors.toList());
}
CompletableFutureSpliterator(CompletableFuture<T>[] futures) {
this.futures = new ArrayList<>(Arrays.asList(futures));
}
CompletableFutureSpliterator(final List<CompletableFuture<? extends T>> futures) {
this.futures = new ArrayList<>(futures);
}
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
if (futures.isEmpty())
return false;
CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join();
// now at least one of the futures has finished, get its value and remove it
ListIterator<CompletableFuture<? extends T>> it = futures.listIterator(futures.size());
while (it.hasPrevious()) {
final CompletableFuture<? extends T> future = it.previous();
if (future.isDone()) {
it.remove();
action.accept(future.join());
return true;
}
}
throw new IllegalStateException("Should not reach here");
}
@Override
public Spliterator<T> trySplit() {
if (futures.size() > 1) {
int middle = futures.size() >>> 1;
// relies on the constructor copying the list, as it gets modified in place
Spliterator<T> result = new CompletableFutureSpliterator<>(futures.subList(0, middle));
futures = futures.subList(middle, futures.size());
return result;
}
return null;
}
@Override
public long estimateSize() {
return futures.size();
}
@Override
public int characteristics() {
return IMMUTABLE | SIZED | SUBSIZED;
}
}
它通过变换给出Stream<CompletableFuture<T>>
到这些期货的List
- 假设建立流快,辛勤工作由期货本身做,所以列出它不应该是昂贵的。这也确保所有任务已经被触发,因为它强制处理源流。
为了生成输出流,它只是等待任何未来完成之前流传输其值。
一个简单的非平行使用例(执行程序用于CompletableFuture
S,以便同时启动它们全部):
ExecutorService executor = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
flattenStreamOfFutures(IntStream.range(0, 20)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((i % 10) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms");
return i;
}, executor)), false)
.forEach(x -> {
System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x);
});
executor.shutdown();
输出:
Finished 10 @ 103ms
Finished 0 @ 105ms
main @ 114ms handle result: 10
main @ 114ms handle result: 0
Finished 1 @ 1102ms
main @ 1102ms handle result: 1
Finished 11 @ 1104ms
main @ 1104ms handle result: 11
Finished 2 @ 2102ms
main @ 2102ms handle result: 2
Finished 12 @ 2104ms
main @ 2105ms handle result: 12
Finished 3 @ 3102ms
main @ 3102ms handle result: 3
Finished 13 @ 3104ms
main @ 3105ms handle result: 13
…
作为你可以看到,即使期货没有按期完成,该流几乎立即产生价值。
的问题把它应用到的例子中,这将给(假设parseLinks()
回报CompletableFuture<String>
代替~<Void>
):
flattenStreamOfFuturesOfStream(IntStream.range(0, 10)
.mapToObj(this::getPage)
// the next map() will give a Stream<CompletableFuture<Stream<String>>>
// hence the need for flattenStreamOfFuturesOfStream()
.map(pcf -> pcf
.thenApply(page -> flattenStreamOfFutures(page
.getDocsId()
.stream()
.map(this::getDocument)
.map(docCF -> docCF.thenCompose(this::parseLinks)),
false))),
false)
.forEach(System.out::println);
基本上你需要flatMap CompletableFuture成流 –