Java 8 学习笔记4——流的概念

流是什么

流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。

就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,你无需写任何多线程代码了!我们简单看看使用流的好处吧。

下面两段代码都是用来返回低热量的菜肴名称的,并按照卡路里排序,一个是用Java 7写的,另一个是用Java 8的流写的。

Java 7

List<Dish> lowCaloricDishes=new ArrayList<>();
for(Dish d:menu){
	if(d.getCalories() < 400){		//用累加器筛选元素
		lowCaloricDishes.add(d);
	}
}
Collections.sort(lowCaloricDishes,new Comparator<Dish>(){	//用匿名类对菜肴排序
	public int compare(Dish d1,Dish d2){
		return Integer.compare(d1.getCalories(),d2.getCalories());
	}
});
List<String> lowCaloricDishesName=new ArrayList<>();
for(Dish d:lowCaloricDishes){
	lowCaloricDishesName.add(d.getName());	//处理排序后的菜名列表
}

在这段代码中,你用了一个“垃圾变量”lowCaloricDishes。它唯一的作用就是作为一次性的中间容器。在Java 8中,实现的细节被放在它本该归属的库里了。

Java 8

import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
List<String> lowCaloricDishesName
	=menu.stream()
		.filter(d -> d.getCalories() < 400)		//选出400卡路里以下的菜肴
		.sorted(comparing(Dish:: getCalories))	//按照卡路里排序
		.map(Dish:: getName)	//提取菜肴的名称
		.collect(toList());	//将所有名称保存在List中

为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream()

List<String> lowCaloricDishesName
	=menu.parallelStream()
		.filter(d -> d.getCalories() < 400)
		.sorted(comparing(Dishes:: getCalories))
		.map(Dish:: getName)
		.collect(toList());

从软件工程师的角度来看,新的方法有几个显而易见的好处:

  • 代码是以声明性方式写的:说明想要完成什么(筛选热量低的菜肴)而不是说明如何实现一个操作(利用循环和if条件等控制流语句)。这种方法加上行为参数化让你可以轻松应对变化的需求:你很容易再创建一个代码版本,利用Lambda表达式来筛选高卡路里的菜肴,而用不着去复制粘贴代码。
  • 你可以把几个基础操作链接起来,来表达复杂的数据处理流水线(在filter后面接上sortedmapcollect操作,如下图所示),同时保持代码清晰可读。filter的结果被传给了sorted方法,再传给map方法,最后传给collect方法。
    Java 8 学习笔记4——流的概念
    因为filtersortedmapcollect等操作是与具体线程模型无关的高层次构件,所以它们的内部实现可以是单线程的,也可能透明地充分利用你的多核架构!在实践中,这意味着你用不着为了让某些数据处理任务并行而去操心线程和锁了,Stream API都替你做好了!

Java 8中的Stream API可以让你写出这样的代码:

  • 声明性——更简洁,更易读
  • 可复合——更灵活
  • 可并行——性能更好

下面要使用的例子是一个menu,它是一张菜肴列表:

List< Dish> menu = Arrays. asList( 
	new Dish("pork", false, 800, Dish.Type.MEAT), 
	new Dish("beef", false, 700, Dish.Type.MEAT), 
	new Dish("chicken", false, 400, Dish.Type.MEAT), 
	new Dish("french fries", true, 530, Dish.Type.OTHER), 
	new Dish("rice", true, 350, Dish.Type.OTHER), 
	new Dish("season fruit", true, 120, Dish.Type.OTHER), 
	new Dish("pizza", true, 550, Dish.Type.OTHER), 
	new Dish("prawns", false, 300, Dish.Type.FISH), 
	new Dish("salmon", false, 450, Dish.Type.FISH) 
);

Dish类的定义是:

public class Dish {

	private final String name;
	private final boolean vegetarian;
	private final int calories;
	private final Type type;

	public Dish(String name, boolean vegetarian, int calories, Type type) {
		this.name = name;
		this.vegetarian = vegetarian;
		this.calories = calories;
		this.type = type;
	}

	public String getName() {
		return name;
	}

	public boolean isVegetarian() {
		return vegetarian;
	}

	public int getCalories() {
		return calories;
	}

	public Type getType() {
		return type;
	}

	@Override
	public String toString() {
		return name;
	}

	public enum Type{
		MEAT,FISH,OTHER
	}

}

流简介

Java 8中的集合支持一个新的stream方法,它会返回一个流(接口定义在java.util.stream.Stream里)。

到底是什么?简短的定义就是“从支持数据处理操作的源生成的元素序列”,下面一步步剖析这个定义:

  • 元素序列——就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据机构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayListLinkedList)。但流的目的在于表达计算,比如前面见到的filtersortedmap集合讲的是数据,流讲的是计算
  • ——流会使用一个提供数据的源,如集合、数组或输入/输出资源。注意从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。
  • 数据处理操作——流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如filtermapreducefindmatchsort等。流操作可以顺序执行,也可并行执行

此外,流操作有两个重要的特点:

  • 流水线——很多流操作本身会返回一个流,这样多个操作就可以连接起来,形成一个大的流水线。
  • 内部迭代——与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。

下面来看一段能够体现所有这些概念的代码:

import static java.util.stream.Collectors.toList;
List<String> threeHighCaloricDishNames
	=menu.stream()	//从menu获得流(菜肴列表)
		.filter(d -> d.getCalories() > 300)	//建立操作流水线:首先选出高热量的菜肴
		.map(Dish:: getName)	//获取菜名
		.limit(3)	//只选择头三个
		.collect(toList());	//将结果保存在另一个List中
System.out.println(threeHighCaloricDishNames);	//结果是[pork,beef,chicken]

在本例中,我们先是对menu调用stream方法,由菜单得到一个流。数据源是菜肴列表(菜单),它给流提供一个元素序列。接下来,对流应用一系列数据处理操作filtermaplimitcollect。除了collect之外,所有这些操作都会返回另一个流,这样它们就可以接成一条流水线,于是就可以看作对源的一个查询。最后,collect操作开始处理流水线,并返回结果(它和别的操作不一样,因为它返回的不是流,在这里是一个List)。在调用collect之前,没有任何结果产生,实际上根本就没有从menu里选择元素。你可以这么理解:链中的方法调用都在排队等待,直到调用collect
Java 8 学习笔记4——流的概念
上图显示了流操作的顺序:filtermaplimitcollect,每个操作简介如下。

  • filter——接受Lambda,从流中排除某些元素。在本例中,通过传递lambda d -> d.getCalories() > 300选择出热量超过300卡路里的菜肴。
  • map——接受一个Lambda,将元素转换成其他形式或提取信息。在本例中,通过传递方法引用Dish:: getName,相当于Lambdad -> d.getName(),提取了每道菜的菜名。
  • limit——截断流,使其元素不超过给定数量。
  • collect——将流转换为其他形式。在本例中,流被转换为一个列表。现在可以把collect看作能够接受各种方案作为参数,并将流中的元素累积成为一个汇总结果的操作。这里的toList()就是将流转换为列表的方案。

刚刚解释的这段代码,与逐项处理菜单列表的代码有很大不同。首先,我们使用了声明性的方式来处理菜单数据,即你说的对这些数据需要做什么:“查找热量最高的三道菜的菜名。”你并没有去实现筛选(filter)、提取(map)或截断(limit)功能,Streams库已经自带了。因此,Stream API在决定如何优化这条流水线时更为灵活。例如,筛选、提取和截断操作可以一次进行,并在找到这三道菜后立即停止。

流与集合

Java现有的集合概念和新的概念都提供了接口,来配合代表元素型有序值的数据接口。所谓有序,就是说我们一般是按顺序取用值,而不是随机取用的。那这两者有什么区别呢?

比如说存在DVD里的电影,这就是一个集合(也许是字节,也许是帧,这个无所谓),因为它包含了整个数据结构。现在再来想想在互联网上通过视频流看同样的电影。现在这是一个(字节流或帧流)。流媒体视频播放器只要提前下载用户观看位置的那几帧就可以了,这样不用等到流中大部分值计算出来,你就可以显示流的开始部分了(想想观看直播足球赛)。特别要注意,视频播放器可能没有将整个流作为集合,保存所需要的内存缓冲区——而且要是非得等到最后一帧出现才能开始看,那等待的时间就太长了。出于实现的考虑,你也可以让视频播放器把流的一部分缓存在集合里,但和概念上的差异不是一回事。

粗略地说,集合与流之间的差异就在于什么时候进行计算

集合是一个内存中的数据结构,它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。(你可以往集合里加东西或者删东西,但是不管什么时候,集合中的每个元素都是放在内存里的,元素都得先算出来才能成为集合的一部分。)

相比之下,**流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。**这对编程有很大的好处。用户仅仅从流中提取需要的值,而这些值——在用户看不见的地方——只会按需生成。这是一种生产者-消费者的关系。从另一个角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会计算值(用管理学的话说这就是需求驱动,甚至是实时制造)。

与此相反,集合则是急切创建的(供应商驱动:先把仓库装满,再开始卖,就像那些昙花一现的圣诞新玩意儿一样)。以质数为例,要是想创建一个包含所有质数的集合,那这个程序算起来就没完没了了,因为总有新的质数要算,然后把它加到集合里面。当然这个集合是永远也创建不完的,消费者这辈子都见不着了。

下图用DVD对比在线流媒体的例子展示了流和集合之间的差异。
Java 8 学习笔记4——流的概念
另一个例子是用浏览器进行互联网搜索。假设你搜索的短语在Google或是网店里面有很多匹配项。你用不着等到所有结果和照片的集合下载完,而是得到一个流,里面有最好的10个或20个匹配项,还有一个按钮来查看下面10个或20个。当你作为消费者点击“下面10个”的时候,供应商就按需计算这些结果,然后再送回你的浏览器上显示。

只能遍历一次

和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。你可以从原始数据源那里再获得一个新的流来重新遍历一遍,就像迭代器一样(这里假设它是集合之类的可重复的源,如果是I/O通道就没戏了)。例如,以下代码会抛出一个异常,说流已被消费掉了:

List<String> title=Arrays.asList("Java8","In","Action");
Stream<String> s=title.stream();
s.forEach(System.out:: println);	//打印标题中的每个单词
s.forEach(System.out:: println);	//java.lang.IllegalStateException:流已被操作或关闭

所以要记得,流只能消费一次!

集合和流的另一个关键区别在于它们遍历数据的方式

外部迭代与内部迭代

流利用内部迭代:迭代通过filtermapsorted等操作被抽象掉了。

使用Collection接口需要用户去做迭代(比如用for-each),这称为外部迭代。相反,Streams库使用内部迭代——它帮你把迭代做了,还把得到的流值存在了某个地方,你只要给出一个函数说要干什么就可以了。下面的3段代码说明了这种区别。

  1. 集合:用for-each循环外部迭代
List<String> names=new ArrayList<>();
for(Dish d:menu){	//显式顺序迭代菜单列表
	names.add(d.getName());	//提取名称并将其添加到累加器
}

for-each还隐藏了迭代中的一些复杂性。for-each结构是一个语法糖,它背后的东西用Iterator对象表达出来更要丑陋得多。

  1. 集合:用背后的迭代器做外部迭代
List< String> names = new ArrayList<>(); 
Iterator<String> iterator = menu.iterator(); 
while(iterator.hasNext()) { 	//显式迭代 
	Dish d = iterator.next(); 
	names.add(d.getName()); 
}
  1. 流:内部迭代
List<String> names=menu.stream()
					.map(Dish:: getName)	//用getName方法参数化map,提取菜名
					.collect(toList());	//开始执行操作流水线;没有迭代!

用一个比喻来解释内部迭代的差异和好处。比方说你在和你两岁的女儿索菲亚说话,希望她能把玩具收起来。
Java 8 学习笔记4——流的概念
这正是你每天都要对Java集合做的。你外部迭代一个集合,显式地取出每个项目再加以处理。如果你只需跟索菲亚说“把地上所有的玩具都放进盒子里”就好了。

内部迭代比较好的原因有二:

第一,索非亚可以选择一只手拿娃娃,另一只手拿球;

第二,她可以决定先拿离盒子最近的那个东西,然后再拿别的。同样的道理,内部迭代时,项目可以透明地并行处理,或者用更优化的顺序进行处理。要是用Java过去的那种外部迭代方法,这些优化都是很困难的。

这似乎有点儿鸡蛋里挑骨头,但这差不多就是Java 8引入流的理由了——Streams库的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现。与此相反,一旦通过写for-each而选择了外部迭代,那你基本上就要自己管理所有的并行问题了(自己管理实际上意味着“某个良辰吉日我们会把它并行化”或“开始了关于任务和synchronized的漫长而艰苦的斗争”)。Java 8需要一个类似于Collection却没有迭代器的接口,于是就有了Stream

下图说明了内部迭代)与集合外部迭代)之间的差异。
Java 8 学习笔记4——流的概念
前面已经说过了集合与流在概念上的差异,特别是流利用了内部迭代:替你把迭代做了。但是,只有你已经预先定义好了能够隐藏迭代的操作列表,例如filtermap,这个才有用。大多数这类操作都接受Lambda表达式作为参数,因此你可以用前面介绍的方法来参数化其行为。Java语言的设计者给Stream API配上了一大套可以用来表达复杂数据处理查询的操作。

流操作

java.util.stream.Stream中的Stream接口定义了许多操作。它们可以分为两大类。我们再来看一下前面的例子:

List<String> names=menu.stream()		//从菜单获得流
                        .filter(d -> d.getCalories() > 300)		//中间操作
                        .map(Dish:: getName)		//中间操作
                        .limit(3)		//中间操作
                        .collect(toList());		//将Stream转换为List

你可以看到两类操作:

  • filtermaplimit可以连成一条流水线
  • collect触发流水线执行并关闭它

可以连接起来的流操作称为中间操作,关闭流的操作称为终端操作。下图展示了这两类操作。
Java 8 学习笔记4——流的概念

中间操作

诸如filtersorted中间操作会返回另一个流。这让多个操作可以连接起来形成一个查询。重要的是,除非流水线上触发一个终端操作,否则中间操作不会执行任何处理——它们很懒。这是因为中间操作一般都可以合并起来,在终端操作时一次性全部处理。

filtermap等中间操作会返回一个流,并可以链接在一起。可以用它们来设置一条流水线,但并不会生成任何结果。

为了搞清楚流水线中到底发生了什么,我们把代码改一改,让每个Lambda都打印出当前处理的菜肴:

List<String> names=menu.stream()
                        .filter(d -> {
                            			System.out.println("filtering"+d.getName());
                            			return d.getCalories() > 300;
                       				 })		//打印当前筛选的菜肴
                        .map(d -> {
                                    	System.out.println("mapping"+d.getName());
                                    	return d.getName();
                                   })		//提取菜名时打印出来
                        .limit(3)
                        .collect(toList());
System.out.println(names);

此代码执行时将打印:

filtering pork 
mapping pork 
filtering beef 
mapping beef 
filtering chicken 
mapping chicken 
[pork, beef, chicken]

你会发现,有好几种优化利用了流的延迟性质。第一,尽管很多菜的热量都高于300卡路里,但只选出了前三个!这是因为limit操作和一种称为短路的技巧。第二,尽管filtermap是两个独立的操作,但它们合并到同一次遍历中了(我们把这种技术叫作循环合并)。

终端操作

forEachcount等终端操作会返回一个非流的值,并处理流水线以返回结果。

终端操作会从流的流水线生成结果其结果是任何不是流的值,比如ListInteger,甚至void。例如,在下面的流水线中,forEach是一个返回void的终端操作,它会对源中的每道菜应用一个Lambda。把System.out.println传递给forEach,并要求它打印出由menu生成的流中的每一个Dish

menu.stream().forEach(System.out:: println);

使用流

流的使用一般包括三件事:

  • 一个数据源(如集合)来执行一个查询;
  • 一个中间操作链,形成一条流的流水线;
  • 一个终端操作,执行流水线,并能生成结果。

流的流水线背后的理念类似于构建器模式。在构建器模式中有一个调用链用来设置一套配置(对流来说这就是一个中间操作链),接着是调用built方法(对流来说就是终端操作)。

为方便起见,下面两个表总结了前面在代码例子中看到的中间流操作终端流操作。但这并不能涵盖Stream API提供的操作,后面还会看到更多。

  • 中间操作:
操作 类型 返回类型 操作参数 函数描述符
filter 中间 Stream< T> Predicate< T> T -> boolean
map 中间 Stream< T> Function< T, R> T -> R
limit 中间 Stream< T>
sorted 中间 Stream< T> Comparator< T> (T, T) -> int
distinct 中间 Stream< T>
  • 终端操作:
操作 类型 目的
forEach 终端 消费流中的每个元素并对其应用Lambda,这一操作返回void
count 终端 返回流中元素的个数,这一操作返回long
collect 终端 把流归约成一个集合,比如List、Map甚至是Integer