Spark实战第二版(涵盖Spark3.0)--第三章 宏伟的角色--dataframe
关注公众号:登峰大数据,阅读Spark实战第二版(完整中文版),系统学习Spark3.0大数据框架!
如果您觉得作者翻译的内容有帮助,请分享给更多人。您的分享,是作者翻译的动力!
本章涵盖了
-
使用dataframe
-
在Spark中,dataframe的基本作用
-
理解数据不变性
-
快速调试一个dataframe的schema
-
理解RDDs中的底层存储
在本章中,你将学习如何使用dataframe。dataframe在Spark应用程序中非常重要,因为它通过模式包含类型化数据,并提供了强大的API。
正如您在前面章节中看到的,Spark是一个了不起的分布式分析引擎。维基百科将操作系统定义为“管理计算机硬件和软件资源,并为计算机程序提供公共服务的系统软件”。在第1章中,我甚至将Spark定义为操作系统,因为它提供了构建应用程序和管理资源所需的所有服务。要以编程的方式使用Spark,您需要了解它的一些关键api。为了执行分析和数据操作,Spark需要存储,包括逻辑存储(在应用程序级)和物理存储(在硬件级)。
在逻辑级别,最好的存储容器是dataframe,它是一种类似于关系数据库中的表的数据结构。在这一章中,你将深入研究dataframe的结构,并学习如何通过API使用dataframe。
转换是对数据执行的操作,例如从日期中提取年份、组合两个字段、规范化数据等等。在本章中,您将学习如何使用特定于dataframe的函数来执行转换,以及直接附加到dataframe API的方法。您将使用类似SQL union的操作将两个dataframe合并为一个dataframe。您还将看到dataset和dataframe之间的区别,以及如何相互转换。
最后,您将看到弹性分布式数据集(RDD),它是Spark中的第一代存储。dataframe构建在RDD概念之上,您可能会在讨论和项目中遇到RDDs。
本章的例子被分为多个实验。在本章的最后,您将在两个dataframes中接入两个文件,修改它们的模式以便它们匹配,并合并结果。在执行这些操作时,您将看到Spark如何处理存储。在不同的步骤中,您将检查dataframes。
实验
本章中的例子可以在GitHub中获得:https://github com/jgperrin/net.jgp.books.spark.ch03
3.1 Spark中dataframe的基本作用
在本节中,您将了解什么是dataframe以及它是如何组织的。你还会学到关于不变性的知识。
dataframe既是数据结构又是API,如图3.1所示。在Spark SQL、Spark流处理、MLlib(用于机器学习)和GraphX中使用Spark的dataframe API来操作Spark中的基于图的数据结构。使用这种统一的API可以极大地简化对这些技术的访问。您不必学习每个子库的API。
图3.1 通过只学习一种API来执行Spark SQL、流处理、机器和深度学习以及基于图的分析,开发人员会更高兴!
将dataframe描述为宏伟的,可能有点奇怪,但这个限定符非常适合它。就像宏伟的艺术品吸引着人们的好奇心,一棵宏伟的橡树统治着森林,宏伟的城墙保护着一座城堡,dataframe在Spark世界中也是宏伟的。
3.1.1dataframe是如何组织数据的
在本节中,您将了解dataframe如何组织数据的。dataframe是由指定列组成的一组记录。它相当于关系数据库中的表或Java中的ResultSet。图3.2演示了一个dataframe。
可以从各种各样的源(如文件、数据库或自定义数据源)构建数据框架。dataframe的关键概念是它的API,该API在Java、Python、Scala和r中都可用。在Java中,dataframe由行数据集表示:dataset <Row>。
根据Spark目前的策略,存储可以在内存中,也可以在磁盘上,但是它会尽可能多地使用内存。
Dataframes以StructType的形式组成schema,可用于内省。Dataframes还包括一个printSchema()方法,以更快地调试Dataframes。理论已经足够——让我们开始实践!
图3.2一个完整的dataframe(模式和数据):dataframe被实现为一个行数据集(dataset <Row>)。每个列都有名字和类型两部分。数据本身在分区中。本演示基于3.2.1节中使用的威克郡餐馆数据集。
3.1.2不变性不是骂人的话
数据缓存以及数据集和rdd(在3.4节中讨论)被认为是不可变的存储。不变性被定义为不可改变。当应用于一个对象时,它意味着它的状态在它被创建之后不能被修改。
我认为这个术语是违反直觉的。当我第一次开始使用Spark时,我很难接受这样的概念:让我们使用这个为数据处理而设计的杰出技术,但数据是不可变的。你希望我处理数据,但它不能改变?
图3.3给出了一个解释:数据的第一种状态是不可变的;然后开始修改它,但是Spark只存储转换的步骤,而不是每个步骤转换后的数据。让我换一种说法:Spark以不可变的方式存储数据的初始状态,然后保留菜谱(转换的列表)。中间数据不存储。第四章深入探讨了转换。
图3.3 一个典型的Spark流:数据最初以不可变的方式存储。存储的是转换的方法,而不是各个阶段的数据。
当添加节点时,原因就更容易理解了。图3.3展示了一个典型的具有一个节点的spark 流,而图3.4展示了具有更多节点的spark流。
图3.4 在添加节点时,想象一下数据同步的复杂性。通过只保留菜谱(转换列表),减少了对存储的依赖并提高了可靠性(弹性)。阶段2中不存储任何数据。
当你以分布式的方式思考时,不变性变得非常重要。在存储方面,您有两种选择:
●存储数据,每次修改都立即在每个节点上完成,就像在关系数据库中一样。
●保持节点上的数据同步,只与各个节点共享转换列表。
Spark使用第二种解决方案,因为它同步转换列表比同步每个节点上的所有数据更快。第4章介绍了Catalyst的优化。Catalyst是负责Spark处理优化的酷孩子。不变性和转换配方是这个优化引擎的基础。
尽管Spark出色地使用了不变性作为优化数据处理的基础,但是在开发应用程序时,您不必过多地考虑它。Spark与任何优秀的操作系统一样,将为您处理这些资源。
3.2通过示例使用dataframe
没有什么比以一个小例子开始更好的了。你在第1章和第2章中接入了文件数据。但是之后会发生什么呢?
在本节中,您将执行两个简单的接入操作。然后,您将研究它们的模式和存储,以便了解在应用程序中使用dataframes时的行为。第一张是北卡罗来纳州威克郡的餐馆名单。第二个数据集包括北卡罗莱纳州达勒姆县的餐馆。然后您将转换数据集,以便您可以通过union将它们组合起来。
这些是您作为Spark开发人员将执行的关键操作,因此理解它们背后的原则将为您提供所需的基础。图3.5说明了这个过程。
图3.5 本章的实验将演示:文件数据的接入,通过转换对dataframes的修改,dataframes的union,以及dataframes内容的显示。
union操作之后的目标(和最终的)dataframe,需要在执行转换之后拥有相同的模式,如图3.6所示
图3.6 源dataframe和目标dataframe的映射
3.2.1 简单的CSV摄取后的dataframe
在本节中,将首先获取数据,然后查看dataframe中的数据,以理解模式。这个过程是您理解Spark工作方式的一个重要步骤。
示例的目标是标准化数据集,使其符合特定的标准,正如您在图3.6中看到的那样。我打赌你喜欢去餐馆。也许不是每天,也许不是每一种,但你们每个人都有自己的偏好:食物的类型,离家的距离,公司,噪音水平,等等。Yelp或OpenTable等网站拥有丰富的数据集,但让我们来探索一些开放数据。图3.7演示了本例中的过程。
第一个数据集来自北卡罗来纳州的维克县,网址是http://mng.bz/5AM7。它包含了该县的餐馆列表。数据可以直接从http://mng.bz/Jz2P下载。
现在,执行dataframe的接入和转换,以便它与输出匹配(通过重命名和删除列);然后将范围扩大到数据分区。在接收和转换数据时,还将计算记录的数量。图3.8演示了这个映射。
图3.8将维克县餐馆的输入dataframe映射到目标dataframe。这些小垃圾桶表示您要丢弃的字段。
实验:可以从GitHub下载代码:https://github.com/jgperrin/net.jgp.books.spark.ch03。这是包net.jgp.books.spark.ch03.lab200_ingestion_schema_manipulation中的实验#200。
获得的结果是一个餐馆列表,与图3.8中定义的映射相匹配。请注意,以下输出已被更改以适合本页:
因为分布在多行上的记录有点难以阅读,所以我将记录添加为图3.9中的屏幕截图。
图3.9维克郡餐馆数据集的前五行
要显示这些数据集(即dataframe),您的代码将如下所示:
//静态函数是Spark中的一个强大工具;你将在第13章中学习更多,附录G提供了参考。
到目前为止,这种数据接入类似于第1章数据接入的一个简单的图书列表,和第2章数据接入的一个作者列表。数据接入总是以同样的方式进行,第7、8和9章提供了进一步的细节。让我们再深入一点dataframe。可以使用printSchema()将模式打印到标准输出(stdout)。结果如下:
附录H提供了关于类型的更多细节。可以使用如下方法:
df.printSchema();
有一种简单的方法可以计算dataframe中记录的数量。假设你想要显示这个:
We have 3440 records.
你只需使用以下方法:
System.out.println("We have " + df.count() + " records.");
本节的目标是合并两个dataframe,就像执行两个表的SQL union一样。。为了使union有效,需要在两个dataframe中使用类似名称的列。要做到这一点,您可以很容易地想象第一个数据集的模式也需要修改了。它看起来是这样的:
让我们来完成这个转换。注意方法链接的强大使用。如第2章所述,Java api可以使用方法链接,如:
SparkSession .builder().appName(…).master(…).getOrCreate()
而不是在每一步创建一个对象并将其传递给下一个操作。
您将使用dataframe的四种方法和两个静态函数。您可能熟悉静态函数:它们是那些“分组”在类中,但不需要实例化类的函数。
方法很容易理解:它们被附加到对象本身。当您直接处理列中的值时,静态函数非常有用。当你读这本书的时候,你会看到这些静态函数越来越多的使用,它们在第13章和附录G中有更详细的描述。
如果您没有找到实现您想要的功能的函数(例如,特定的转换或对现有库的调用),那么您可以编写自己的函数。这些函数称为用户定义函数(udf),您将在第16章中学习。
让我们看看你现在需要的方法和函数:
-
withColumn()方法—从表达式或列创建新列。
-
withColumnRenamed()方法—重命名一个列。
-
col()方法——从列的名称获取列。有些方法将列名作为参数,有些方法需要一个列对象。
-
drop()方法—从dataframe中删除一个列。此方法接受列对象或列名的实例。
-
lit()函数——创建一个带有值的列;字面意思,字面值。
-
concat()函数-连接一组列中的值。
你现在可以看到代码:
您可能需要为每个记录提供唯一的标识符。可以创建id列,并通过连接以下内容来构建它:
-
1 state字段值
-
2 下划线(_)
-
3 county字段值
-
4 下划线(_)
-
5 数据集中的标识符
代码是这样的:
df = df.withColumn("id", concat( df.col("state"), lit("_"), df.col("county"), lit("_"), df.col("datasetId")));
最后,您可以显示5条记录并打印schema:
System.out.println("*** Dataframe transformed"); df.show(5); df.printSchema();
3.2.2 数据存储在分区中
(未完待续......) 欢迎关注公众号,及时获得最新翻译内容: