Spark RDD概念学习系列之RDD的转换(十)
RDD的转换
Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。
Spark Scala版本的Word Count程序如下:
1
2
3
4
5
|
1 : <span style= "color: #ff0000;" ><strong>val file = spark.textFile( "hdfs://..." )</strong></span>
2 : <strong><span style= "color: #ff0000;" >val counts = file.flatMap(line => line.split( " " ))</span></strong>
3 : <strong><span style= "color: #ff0000;" > .map(word => (word, 1 ))</span></strong>
4 : <strong><span style= "color: #ff0000;" > .reduceByKey(_ + _)</span></strong>
5 :<strong><span style= "color: #ff0000;" > counts.saveAsTextFile( "hdfs://..." )</span></strong>
|
file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。那么上面这5行代码的具体实现是什么呢?
1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。
spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。
也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。
2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD。
3)行3:将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word, 1)的元组。这些元组最终被放到一个MapPartitionsRDD中。
4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。
5)行5:首先会生成一个MapPartitionsRDD,这个RDD会通过调用org.apache.spark.rdd.PairRDDFunctions#saveAsHadoopDataset向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。
RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。
RDD的依赖关系(宽依赖和窄依赖)
如,假设,现在如下
所以,
比如,我这里是刚好是4台worker1、worker2、worker3、worker4。还有1台Master。
soga,
1
|
<span style= "color: #ff0000;" ><strong>val file = spark.textFile( "hdfs://..." )<br></strong></span>
|
1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。
spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。
也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。
想要成为高手,一定要多看源码,看上几十遍都太少了,包括看上10个版本的源码。无论是hadoop、还是spark。
1
|
<span style= "color: #ff0000;" ><strong>val counts = file.flatMap(line => line.split( " " ))</strong></span><br> 2 )<strong>行 2 </strong>:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到<strong>MapPartitionsRDD</strong>。<br><br><br>
|
1
|
<span style= "color: #ff0000;" ><strong>.map(word => (word, 1 ))</strong></span><br> 3 )<strong>行 3 </strong>:将第 2 步生成的MapPartitionsRDD再次经过map将每个单词word转为(word, 1 )的元组。这些元组最终被放到一个<strong>MapPartitionsRDD</strong>中。
|
至此,windows本地,已经完成了。
下面是在网络里了。
注意啦! 分区是计算概念,分片是数据概念。
有4台worker,每台都在自己内存计算。
1
|
<strong>.reduceByKey(_ + _)</strong> |
4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。
总结:
第一个stage :
HadoopRDD -> MapPartitionRDD -> MapPartitionsRDD -> MapPartitionsRDD -> MapPartitionsRDD
第二个stage :
Stage shuffledRDD -> MapPartitionsRDD
本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5723764.html,如需转载请自行联系原作者