Spark零碎知识点
RDD的概念:
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
个人注释(非官方):
不可变:每一个算子计算完的结果不可变,结果再进行其它计算会得到新的一个结果。
可分区:数据可以存储在不同的分区中,算子作用到各个分区上。
并行计算:作用到各个分区上的算子计算可并行(同时)进行。
自动容错:为了提高计算效率,在shuffle前实现容错只需要把丢失的分区对应的父RDD分区进行重新计算即可,但如果是shuffle后的分区数据丢失,此时就需要将整个父RDD分区调用shuffle算子进行重新计算。RDD的容错就是指的是RDD分区的恢复过程,和Worker宕机无关。
位置感知:就是把具体的计算逻辑传到对应的数据节点上进行计算,可以有效避免发生大量网络IO。
可伸缩性:即自动容错。
RDD的属性
1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
groupByKey和reduceByKey的区别
reduceByKey会先进行局部聚合,再进行全局聚合,这样会在全局聚合时减少网络IO,起到优化作用。所以,能用reduceByKey的情况,尽量用reduceByKey。
checkpoint的应用场景:
在应用程序执行过程中,有时候某些RDD的数据需要在其他地方多次用到(包括其他job中用到),
为了使得整个依赖链条不至于很长导致执行缓慢,可以用checkpoint来缩短依赖链条。
最好把数据checkpoint到HDFS,保证了数据的安全性,便于在用数据的时候进行拉取。
执行过程:
在代码层面是这样执行的,如果用到某个RDD的数据的时候,首先会检查是否做了缓存,如果做了缓存,会直接从缓存里面取数据,
如果没有做缓存,则判断是否做了checkpoint,如果做了checkpoint,则从checkpoint的指定路径下获取数据,
如果没有checkpoint,只能从新计算得到数据。
checkpoint的具体实现步骤:
1、设置一个checkpoint的目录
sc.setCheckpointDir(“hdfs://node01:9000/cp-20190105-1”)
2、把要checkpoint的RDD的数据进行cache
rdd.persist
3、checkpoint
rdd.checkpoint
最后在调用action算子的时候才会统一地进行cache和checkpoint,
而且该实现步骤是用于离线实现过程中的
查看是否做了checkpoint:rdd.isCheckpointed
查看checkpoint的存储目录:rdd.getCheckpointFile
map和mapPartitions的区别:
map是处理RDD里的每个元素,mapPartitions是用于处理RDD的每个分区的
map和foreach的区别:
1、map是有返回值的,foreach没有返回值
2、map常用于将某个RDD做元素的处理,而foreach常用于作为结果的输出到其他的存储系统中
3、map是属于transformation,foreach属于action
foreach和foreachPartition区别:
foreach是针对于RDD的每个元素来操作的,foreachPartition是针对于RDD的每个分区进行操作的
从优化层面讲:foreachPartition用于存储大量结果数据的场景,可以一个分区对应一个数据库的连接,这样就可以减少很多数据库的连接
rdd.foreachPartition(partition => {
val conn = … // 数据库连接
partition.foreach( // 进行存储)
})
stage划分过程
广播变量:
如果需要将Driver端的某个变量的值在Executor端多次使用,
可以将Driver端的某个变量的值以广播的方式传给多个Executor端,
Exeutor端在使用该值的时候就可以不经过网络IO从Driver端获取,而是直接从本地的缓存读取该值即可
这样既可以减少网络IO,又可以节省缓存(因为一个Executor只存一份广播变量就可以了),
广播过来的值会保存到Executor端的BlockManager
注意:
广播变量不可以广播RDD,因为RDD不会封装具体的值,而广播变量只能广播确切的值。
广播变量的值不易太大,如果太大,会把Executor端的缓存占用太多而导致计算时的内存太少而导致计算速度太慢或出现oom。
广播变量只能在Driver端定义,不能再Executor端定义。
Spark的Shuffle过程:
shuffle操作,是在Spark操作中调用了一些特殊的算子才会触发的一种操作,
shuffle操作,会导致大量的数据在不同的节点之间进行传输,
因此,shuffle过程是Spark中最复杂、最消耗性能的一种操作
比如:reduceByKey算子会将上一个RDD中的每个key对应的所有value都聚合成一个value,然后生成一个新的RDD,
新的RDD的元素类型就是<key, value>的格式,每个key对应一个聚合起来的value,
在这里,最大的问题在于,对于上一个RDD来说,并不是一个key对应的所有的value都在一个partition中的,
更不太可能key的所有value都在一个节点上,
对于这种情况,就必须在集群中将各个节点上同一个key对应的values统一传输到一个节点上进行聚合处理,
这个过程势必会发生大量的网络IO。
shuffle过程中会发生shuffle write和shuffle read,
shuffle write:在map task端会发生shuffle write,把要shuffle的数据写到磁盘的过程,为什么要写到磁盘?
主要是为了避免shuffle的数据太大而占用内存太大导致oom,其次把数据存储到磁盘保证了数据的安全性
shuffle read:在reduce task端发生shuffle read,是指下游RDD读取上游RDD的过程,也就是reduce task读取并合并的过程
在进行一个key对应的values的聚合时,
首先,上一个stage的每个map task就必须保证将自己处理的当前分区中的数据相同key写入一个分区文件中,
可能会多个不同的分区文件,
接着下一个stage的reduce task就必须从上一个stage的所有task所在的节点上,
将各个task写入的多个分区文件中找到属于自己的分区文件,
然后将属于自己的分区数据拉取过来,
这样就可以保证每个key对应的所有values都汇聚到一个节点上进行处理和聚合,
这个过程就称之为shuffle!!!
shuffle过程中的分区排序问题
默认情况下,shuffle操作是不会对每个分区中的数据进行排序的
如果想要对每个分区中的数据进行排序,可以使用三种方法:
1、使用mapPartitions算子把每个partition取出来进行排序
2、使用repartitionAndSortWithinPartitions(该算子是对RDD进行重分区的算子),在重分区的过程中同时就进行分区内数据的排序
3、使用sortByKey对所有分区的数据进行全局排序
以上三种方法,mapPartitions代价比较小,因为不需要进行额外的shuffle操作,
repartitionAndSortWithinPartitions和sortByKey可能会进行额外的shuffle操作,因此性能并不是很高
会导致shuffle的算子
1、byKey类的算子:比如reduceByKey、groupByKey、sortByKey、aggregateByKey、combineByKey
2、repartition类的算子:比如repartition(少量分区变成多个分区会发生shuffle)、repartitionAndSortWithinPartitions、coalesce(需要指定是否发生shuffle)、partitionBy
3、join类的算子:比如join(先groupByKey后再join就不会发生shuffle)、cogroup
注意:首先对于上述操作,能不用shuffle操作,就尽量不用,尽量使用不发生shuffle的操作。
其次,如果使用了shuffle操作,那么肯定要进行shuffle的调优,甚至是解决遇到的数据倾斜问题。
shuffle操作是spark中唯一最消耗性能的过程
因此也就成了最需要进行性能调优的地方,最需要解决线上报错的地方,也就是唯一可能出现数据倾斜的地方
为了实时shuffle操作,spark才有stage的概念,在发生shuffle操作的算子中,需要进行stage的划分
shuffle操作的前半部分,属于上一个stage的范围,通常称之为map task,
shuffle操作的后半部分,属于下一个stage的范围,通常称之为reduce task,
其中map task负责数据的组织,也就是将同一个key对应的value都写入同一个下游task对应的分区文件中,
其中reduce task负责数据的聚合,也就是将上一个stage的task所在的节点上,将属于自己的各个分区文件都拉取过来进行聚合
map task会将数据先保存在内存中,如果内存不够时,就溢写到磁盘文件中,
reduce task会读取各个节点上属于自己的分区磁盘文件到自己节点的内存中进行聚合。
由此可见,shuffle操作会消耗大量的内存,因为无论是网络传输数据之前还是之后,
都会使用大量内存中数据结构来实施聚合操作,
在聚合过程中,如果内存不够,只能溢写到磁盘文件中去,
此时就会发生大量的网络IO,降低性能。
此外,shuffle过程中,会产生大量的中间文件,也就是map side写入的大量分区文件,
这些文件会一直保留着,直到RDD不再被使用,而且被gc回收掉了,才会去清理中间文件,
这主要是为了:如果要重新计算shuffle后RDD,那么map side不需要重新再做一次磁盘写操作,
但是这种情况下,如果在应用程序中一直保留着对RDD的引用,
导致很长的时间以后才会进行回收操作,
保存中间文件的目录,由spark.local.dir属性指定
所以,spark性能的消耗体现在:内存的消耗、磁盘IO、网络的IO
task的生成,一定是在stage范围内,不会跨越stage
task的数量可以这样计算:RDD分区的数量乘以stage的数量(必须是没有重分区的操作)