批量执行流操作

问题描述:

我有n个条目的有限流。它的数量是未知提前。数据大小在10Gb左右,适合内存很大,所以我无法将其作为一个整体读取。 什么是在每个100000条目之后以块的形式处理该流的方法?批量执行流操作

Stream<?> blocks 

所以subList方法不适用于我。

我能想象它是这样的代码:

IntStream 
      .range(0, Integer.MAX_VALUE) 
      .filter(s -> s % 100000 == 0) 
      .mapToObj(s -> blocks 
        .skip(s) 
        .collect(Collectors.toList()) 
        .forEach(MyClass::doSomething) 
      ); 

但后来我得到错误,因为限制是终端运营商,它关闭流。有一些解决方法吗?

+0

可能的重复[Java 8 Stream IllegalStateException:Stream已经被操作或关闭](https://*.com/questions/27990451/java-8-stream-illegalstateexception-stream-has-already-been-操作或关闭) – lackerman

+1

我明白为什么会出现该错误,但它不能帮助我提出解决方法。 – lapkritinis

+1

“*因为限制是终端操作员*”'限制“不是终端操作,并且代码中根本没有”限制“操作。 – Holger

这样看来,你将不得不使用Spliterator have a look at the Java docs作为答案提示前面的问题

下面一个简单的例子将在10产生输出阻止大块,同时保持溪流畅通。

Stream<Block<Integer>> blocks = IntStream 
    .range(0, 1000) 
    .mapToObj(Block::new); 

Spliterator<Block<Integer>> split = blocks.spliterator(); 
int chunkSize = 10; 

while (true) { 
    List<Block<Integer>> chunk = new ArrayList<>(10); 
    for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++) { 
    chunk.get(i).doSomething(); 
    } 
    System.out.println(); 
    if (chunk.isEmpty()) break; 
} 

class Block<T> { 
    private T value; 

    Block(T value) { 
    this.value = value; 
    } 

    void doSomething() { 
    System.out.print("v: " + value); 
    } 
} 

如果

IntStream 
     .range(0, Integer.MAX_VALUE) 
     .filter(s -> s % 100000 == 0) 
     .mapToObj(s -> blocks 
       .skip(s) 
       .collect(Collectors.toList()) 
       .forEach(MyClass::doSomething) 
     ); 

编译没有错误,方法doSomething必须是在同一时间接收blocks单个元素的方法,因为这就是List.forEach(…)不单独叫了消费者对每个元素。 (忽略List.forEachvoid的事实,因此不能满足外部流中的mapToObj(…))。在这种情况下,在“配料”没有任何好处可言,你可以只使用

blocks.forEachOrdered(MyClass::doSomething); 

,因为这将在同一时间加载一个元素,并允许加工后作为垃圾收集的每个元素处理下一个元素时(除非doSomething在某处存储引用)。

你试图收集十万要素引入List调用doSomething不会提高性能之前,作为流仍然会加载每个元素一个接一个,你接连仍在处理一个元素。它只能防止最多99999个元素的垃圾回收,直到第100000个处理完成。这没有好处。