java8 Steam 流水线分析
分析背景: java8的出现,引入了两个元素,lambda表达式,和Stream.方便我们少写代码和操作集合;这里我们详细分析一下.Stream流水线,是如何帮助我们快速实现复杂的集合操作.
分析目的: Stream流水线,(1)Stream流水线为什么通过几行代码,就能实现这么复杂的操作需求;(2)他为什么操作的速度要比我们普通的操作方式要快.(3),Stream里面提供了并行操作的代码(未分析)
1.两种操作集合方式的案例分析
通过Stream流水线
final long totalPointsOfOpenTasks = tasks
.stream()
.filter( task -> task.getStatus() == Status.OPEN )
.mapToInt( Task::getPoints )
.sum();
//这里是我们,模拟的一个Stream操控集合的代码.
//其中task是一个实体任务.
//.filter是过滤实体中满足要求的实体.
//.mapToInt也是获得一个满足要求的实体中的内容.
//.sum是获得结果.
通过常规的集合操作方式.
这里我们如果用常规方法来实现的话.
//第一种遍历多次得到结果(过滤元素,获得其中属性,求和).遍历一次(在循环的过程中,一起操作,获得对象,过滤,求和)
int sum=0;
for(int i=0;i<taskList.size;i++){
if(task.getStatus()==open){
sum+=task.get[i].getPoints;
}}
注: 如何这个集合操作里面加入 排序 呢? 实现起来,可能就复杂了.
2.设计思路,
简单的思路:模拟我们上面的常规的集合操作方式,我们统一执行上面的循环,找到一个起始的点,把所有的步骤,全部按照顺序连接起来,并按照固定的规则执行.那么就需要解决这样的问题,1.(记录)记录每一个操作点<具体操作,执行顺序> 2.(执行) 按照一定的规则,按照记录执行代码,完成的内容就类似于上面的常规实现方法的优质解法.
3.实现过程.
如果我们自己来设计这里的代码,按照一种合适的数据结构,把所有的信息拼接起来,然后有一个方法,触发循环,完成类似于我们第二段代码那样的操作().达到减少循环次数,减少中间变量的目的(至于,如何按照合适的执行顺序,既满足代码顺序,又满足结果,再仔细设计).下面,针对源码内容进行简单分析.
介绍几个概念: 1.AbstractPipeline接口 生产线 主要用于制造生产线,将每一个节点记录下来.
2.sink接口 槽点 (有道翻译的) 主要用于执行一系列集合操作的. (sink接口实现consumer接口,consumer接口是Spliterator迭代器执行操作用到的接口) sink的操作大致包含(开始begin()结束end,接收acept()(继承于consumer的方法))
3.Stream的操作 中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算.
源码分析:
1.先记录每一个节点<具体操作,操作顺序>,我们来看一下.filter().mapToint().sum().都干了啥.
code_1
//中间操作
//在执行.filter方法的时候返回一个StatelessOp对象(无状态操作),他是继承于pipeline(生产线),
//我们来看一下,他为我们做了什么,这个对象为我们提供了一个opWrapSink()的方法,并且记录了调
//用.filter()的对象(方法里面的this,也就是生产线的上流),而且opWrapSink记录了操作(predicate.test(u)),那
//么操作和顺序,都被记录下来了.
//在最后触发的时候,这些抽象类被实例化之后,就可以执行这些重写的方法,而且也了利用到了<操作,顺序>
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
code_2
//中间操作
//在执行.mapToInt()方法的时候返回一个StatelessOp对象(无状态操作),他是继承于pipeline(生产线),
//我们来看一下,他为我们做了什么,这个对象为我们提供了一个opWrapSink()的方法,并且记录了调
//用.mapToInt()的对象(方法里面的this,也就是生产线的上流),而且opWrapSink记录了操作//mapper.applyAsInt(u),那
//么操作和顺序,都被记录下来了.
//在最后触发的时候,这些抽象类被实例化之后,就可以执行这些重写的方法,而且也了利用到了<操作,顺序>
@Override
public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedReference<P_OUT, Integer>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.applyAsInt(u));
}
};
}
};
}
code_3
//结束操作
//通过结束操作的方法,一层一层点进去,就能找到把之前每一个中间操作对象串起来的代码.
@Override
public final int sum() {
return reduce(0, Integer::sum);
}
2.结束操作代码,触发执行集合操作
--return 进入下面的方法,上面的是并行操作集合的代码.
code_4
//return 进入下面的方法,上面的是并行操作集合的代码.
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this,sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
--继续跟,会跟到一个,拼装sink操作,wrapSink
code_5
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
code_6
//这个方法完成对sink对象的拼装,拼装成一种类似于树结构的数据
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {//循环每一个实例化过的 abstracPipeline,
//p.previousestage,获得前一个实例化对象.
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
//这里的WrapSink去哪里寻找? 在我们第一步,记录每一个结点的时候,已经创建了每一个
//AbstractPipeline 对象的 opWrapSink()方法,这个方法返回一个sink对象,见上一步.
//结果是: sink=new sink("",sink) 的结构,把所有的sink拼装在了一起.
}
return (Sink<P_IN>) sink;
}
拼装完sink数据,我们继续看code_5里面的copyInto的方法
code_7
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
//是否是短路操作
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
//我们查看下面的方法,易理解
copyIntoWithCancel(wrappedSink, spliterator);
}
}
code_8
final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
//1.循环获得 每一个节点,获得初始结点的 实例化对象
while (p.depth > 0) {
p = p.previousStage;
}
//2.执行sink方法(),按照顺序完成所有begin方法,所有acept方法,所有end方法,
//每一个sink对象的begin方法,已经在实例化的时候,创建了,具体看code3,4,5的代码
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//forEach 方法里面会执行 do { } while (!sink.cancellationRequested() && //spl.tryAdvance(adaptedSink)); 这个方法,这个方法是Spliterator执行迭代的时候用到的方法
//tryAdvance里面会用到我们刚开始为sink实例创建的acept方法.这样为迭代器提供了收集的方法
p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
}
4.具体在使用Stream操作集合和数组的时候,还有一些坑等着踩.
关于有状态操作他的执行方式,有些特殊,他是产生了两个sink链.
Stream 在执行有状态操作的时候,是如何执行的 这里他解决掉了sorted,在什么时候执行排序的问题. 因为排序的顺序影响结果,所以,排序方法的执行,一定要按照顺序执行.
在这里,Stream的处理方式是(分成两个sink链进行执行),先从头顺序执行的所有的begin,然后执行到sorted所对应的begin的时候,不继续向下执行begin,这个时候就会执行sort,在sort里的
end方法可以看到,他先执行完自己的排序方法,继续执行剩下的begin方法.然后收集acept方法,然后end方法.
代码从可以看出来 入下.
详细参考文章地址:
http://www.cnblogs.com/CarpenterLee/p/6637118.html 深入理解Java Stream流水线
http://www.cnblogs.com/Dorae/p/7779246.html java8Stream原理深度解析
5.待解决的问题.
并行实现方式分析爱