Stream 01

Streams API(I)

  • Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator

而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:

1.0-1.4 中的 java.lang.Thread

5.0 中的 java.util.concurrent

6.0 中的 Phasers 等

7.0 中的 Fork/Join 框架

8.0 中的 Lambda、Stream

Stream 的另外一大特点是,数据源本身可以是无限的。

  • 流的构成

当我们使用一个流的时候,通常包括三个基本步骤:

获取一个数据源(source)→ 数据转换→执行操作获取想要的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道,如下图所示。

Stream 01

流管道 (Stream Pipeline) 

Stream 01

在对于一个 Stream 进行多次转换操作 (Intermediate 操作),每次都对 Stream 的每个元素进行转换,而且是执行多次,这样时间复杂度就是 N(转换次数)个 for 循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成。

我们可以这样简单的理解,Stream 里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在 Terminal 操作的时候循环 Stream 对应的集合,然后对每个元素执行所有的函数。

简单说,对 Stream 的使用就是实现一个 filter-map-reduce 过程,产生一个最终结果,或者导致一个副作用(side effect)。


Stream 01

其中各个部分的主要功能为:

  • 主要是各种操作的工厂类、数据的存储结构以及收集器的工厂类等;

  • 主要用于Stream的惰性求值实现;

  • Stream的并行计算框架;

  • 存储并行流的中间结果;

  • 终结操作的定义;

Java 8加入函数式编程扩充了很多功能,Java 8之所以费这么大功夫引入函数式编程,原因有二:

  1. 代码简洁,函数式编程写出的代码简洁且意图明确,使用stream接口让你从此告别for循环。

  2. 多核友好,Java函数式编程使得编写并行程序从未如此简单,你需要的全部就是调用一下parallel()方法。

这一节我们学习stream,也就是Java函数式编程的主角。对于Java 7来说stream完全是个陌生东西,stream并不是某种数据结构,它只是数据源的一种视图。这里的数据源可以是一个数组,Java容器或I/O channel等。正因如此要得到一个stream通常不会手动创建,而是调用对应的工具方法,比如:

  • 调用Collection.stream()或者Collection.parallelStream()方法

  • 调用Arrays.stream(T[] array)方法

常见的stream接口继承关系如图:

Stream 01

图中4种stream接口继承自BaseStream,

IntStream, LongStream, DoubleStream对应三种基本类型(int, long, double,注意不是包装类型)之所以有这三种专门针对基本类型的设计,是为了可以处理基本类型时不用自动拆装箱,从而提高性能。

Stream对应所有剩余类型的stream视图。

 

虽然大部分情况下stream是容器调用Collection.stream()方法得到的,但stream和collections有以下不同:

  • 无存储。stream不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组,Java容器或I/O channel等。

  • 为函数式编程而生。对stream的任何修改都不会修改背后的数据源,比如对stream执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新stream。

  • 惰式执行。stream上的操作并不会立即执行,只有等到用户真正需要结果的时候才会执行。

  • 可消费性。stream只能被“消费”一次,一旦遍历过就会失效,就像容器的迭代器那样,想要再次遍历必须重新生成。

对stream的操作分为为两类,中间操作(intermediate operations)和结束操作(terminal operations),二者特点是:

  1. 中间操作总是会惰式执行,调用中间操作只会生成一个标记了该操作的新stream,仅此而已。

  2. 结束操作会触发实际计算,计算发生时会把所有中间操作积攒的操作以pipeline的方式执行,这样可以减少迭代次数。计算完成之后stream就会失效。

如果你熟悉Apache Spark RDD,对stream的这个特点应该不陌生。

下表汇总了Stream接口的部分常见方法:

操作类型 接口方法
中间操作

concat() distinct() filter() flatMap() limit() map() peek()

skip() sorted() parallel() sequential() unordered()

结束操作

allMatch() anyMatch() collect() count() findAny() findFirst()

forEach() forEachOrdered() max() min() noneMatch() reduce() toArray()

区分中间操作和结束操作最简单的方法,就是看方法的返回值,返回值为stream的大都是中间操作,否则是结束操作。

stream方法使用

stream跟函数接口关系非常紧密,没有函数接口stream就无法工作。回顾一下:函数接口是指内部只有一个抽象方法的接口。通常函数接口出现的地方都可以使用Lambda表达式,所以不必记忆函数接口的名字。

forEach()

我们对forEach()方法并不陌生,在Collection中我们已经见过。方法签名为void forEach(Consumer<? super E> action),作用是对容器中的每个元素执行action指定的动作,也就是对元素进行遍历。

// 使用Stream.forEach()迭代

Stream<String> stream = Stream.of("I", "love", "you", "too");

stream.forEach(str -> System.out.println(str));

由于forEach()是结束方法,上述代码会立即执行,输出所有字符串。

filter()

Stream 01

函数原型为Stream<T> filter(Predicate<? super T> predicate),作用是返回一个只包含满足predicate条件元素的Stream。

// 保留长度等于3的字符串

Stream<String> stream= Stream.of("I", "love", "you", "too");

stream.filter(str -> str.length()==3)

.forEach(str -> System.out.println(str));

上述代码将输出为长度等于3的字符串you和too。注意,由于filter()是个中间操作,如果只调用filter()不会有实际计算,因此也不会输出任何信息。

distinct()

Stream 01

函数原型为Stream<T> distinct(),作用是返回一个去除重复元素之后的Stream。

Stream<String> stream= Stream.of("I", "love", "you", "too", "too");

stream.distinct()

.forEach(str -> System.out.println(str));

上述代码会输出去掉一个too之后的其余字符串。

 

 

sorted()

排序函数有两个,一个是用自然顺序排序,一个是使用自定义比较器排序,函数原型分别为Stream<T> sorted()和Stream<T> sorted(Comparator<? super T> comparator)。

Stream<String> stream= Stream.of("I", "love", "you", "too");

stream.sorted((str1, str2) -> str1.length()-str2.length())

.forEach(str -> System.out.println(str));

上述代码将输出按照长度升序排序后的字符串,结果完全在预料之中。

map()

Stream 01Stream 01

函数原型为<R> Stream<R> map(Function<? super T,? extends R> mapper),作用是返回一个对当前所有元素执行执行mapper之后的结果组成的Stream。直观的说,就是对每个元素按照某种操作进行转换,转换前后Stream中元素的个数不会改变,但元素的类型取决于转换之后的类型。

Stream<String> stream = Stream.of("I", "love", "you", "too");

stream.map(str -> str.toUpperCase())

.forEach(str -> System.out.println(str));

上述代码将输出原字符串的大写形式。

flatMap()

Stream 01

函数原型为<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper),作用是对每个元素执行mapper指定的操作,并用所有mapper返回的Stream中的元素组成一个新的Stream作为最终返回结果。说起来太拗口,通俗的讲flatMap()的作用就相当于把原stream中的所有元素都"摊平"之后组成的Stream,转换前后元素的个数和类型都可能会改变。

Stream<List<Integer>> stream = Stream.of(Arrays.asList(1,2), Arrays.asList(3, 4, 5));

stream.flatMap(list -> list.stream())

.forEach(i -> System.out.println(i));

上述代码中,原来的stream中有两个元素,分别是两个List<Integer>,执行flatMap()之后,将每个List都“摊平”成了一个个的数字,所以会新产生一个由5个数字组成的Stream。所以最终将输出1~5这5个数字。

结语

截止到目前我们感觉良好,已介绍Stream接口函数理解起来并不费劲儿。如果你就此以为函数式编程不过如此,恐怕是高兴地太早了。下一节对Stream规约操作的介绍将刷新你现在的认识。

 

Stream 的特性可以归纳为:

  • 不是数据结构

  • 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。

  • 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。

  • 所有 Stream 的操作必须以 lambda 表达式为参数

  • 不支持索引访问

  • Intermediate 操作永远是惰性化的。

  • 并行能力,当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。

  • 可以是无限的,集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。