Spark Core(十九)Spark性能的调优
- 资源调优
- 就是多分配内存和
core
- 就是多分配内存和
- 更改高效的序列化方法,
kyro
(减少内存开销) -
优化数据结构(减少内存开销)
- 优先使用数组,而不是集合类。优先使用字符串。尽可能少的使用包装类.
- 业务允许的情况下尽量使用
id
作为唯一键,不用String
类型 - 尽量少用对象嵌套结构,可以用
Json
串来代替对象嵌套结构
-
对
RDD
进行持久化与Checkpoint
- 如果一个
RDD
被多次进行Action
操作和Transformation
操作,那么我们为了提高性能就可以将这个RDD
进行持久化。调用RDD
的cache
方法和persist
方法来进行持久化 -
指定序列化的持久化级别
持久化级别 解释 MEMORY_ONLY 将没有序列化的 java
对象持久化到内存中,spark
的默认持久化级别,如果有的分区内存不够就不会在该分区上持久化MEMORY_AND_DISK 将没有序列化的 java
对象持久化到内存中,当内存中不够用的时候,将一部分数据持久化到磁盘中MEMORY_ONLY_SER 将RDD 存储
为序列化的Java对象(每个分区一个字节数组)。与反序列化对象相比,这通常更节省空间,特别是在使用快速序列化器时,但是读起来更需要cpu。MEMORY_AND_DISK_SER 与 MEMORY_ONLY_SER
类似,但超出内存的数据溢出到磁盘中DISK_ONLY 只将 RDD
分区存储在磁盘上。MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 以副本的方式持久化数据
- 如果一个
-
Java
虚拟机的垃圾回收机制的调优- 如果在持久还
RDD
的时候,持久化了大量的数据,那么Java
虚拟机的垃圾回收就可能出现了瓶颈,因为Java
垃圾回收机制是定期对垃圾对象进行回收,使得内存得以释放。频繁的垃圾回收会影响Spark作业的性能,因为Java的垃圾回收机制其实就是一条线程,在这个线程执行的时候,其他所有的线程都变为等待状态,垃圾回收线程执行完了以后其他线程才会继续执行,这样性能就很受影响 - 默认情况下
Executor
内存空间被划分了两块。一块分到60%
的空间,用来缓存RDD
数据,另一块分为剩下的40%
空间,用来分配Task,存放它运行时动态创建的对象。这种情况下,40%
空间不足的时候,就会频繁进行垃圾回收,而且还有可能造成Spark
作业的运行异常。在实际业务中可以根据具体情况来调整这缓存RDD区域内存的占比大小,调整方式为:new SparkConf().set("spark.storage,memoryFraction","0.5")
,使得这两块区域各占50%
Executor
中的Task
运行时的内存空间又被分为两块空间,一块叫做老年代,存放长时间存活的对象,一块叫做新生代,存放短时间存活的对象。新生代又被分为三块空间,第一块叫做Eden
区域,第二块叫做Survivor1
区域,第三块叫做survivor2
区域。在动态创建对象的时候首先会将对象先放入Eden和survivor1
区域中,survivor2
是备用区域。当Eden
区域满了以后就会触发minor gc
操作,会回收新生代中不被使用的对象,剩下的对象就会被移入survivor2
区域中,这时survivor2
和survivor1
就会互换角色,survivor1
变成了备用区域。如果在新生代中多次触发垃圾回收以后,有的对象长时间不会被回收,那么就说明他是一个长时间存活的对象,就会被移到老年代中。如果Eden
区域分的不够大,当发生minor gc
的时候存活的对象会被移入备用区域,如果这个时候备用区域满了,短时间存活的对象就有可能被分到老年区域中,当老年区域满了的时候就出触发full gc
操作来回收老年代的对象,在回收的时候其他线程停止,影响性能。-
提高Spark作业并行度
- 设置并行度为集群cpu的2~3倍的Task数量,
new SparkConf("spark.default.parallelism","10")
,HDFS
上每一个block
就是一个Partition
。
- 设置并行度为集群cpu的2~3倍的Task数量,
-
广播共享数据
- 为每个节点都广播一份共享数据。
val broadcastMyData = sc.broadcast("myData")
,以后再次用到这个变量的时候就直接用广播以后的变量即可。
- 为每个节点都广播一份共享数据。
-
数据本地化
- 数据本地化就是说数据与计算它的代码是分开的,那么最后肯定他们俩需要在一起才能完成计算任务,这也样就出现了网络
IO
,通常来说传输计算代码要比传输数据快的多,因为代码数据量小,数据的数据量大。Spark
也是基于这种机制来进行Task
的调度的。 - 数据本地化级别,
-
PROCESS_LOCAL
:数据与计算代码在一个Jvm
进程中 -
NODE_LOCAL
:数据与计算代码在一个节点上,但是不在一个进程中,比如说不在一个Executor
上 -
NO_PREF
:数据从哪来性能都是一样的 -
RACK_LOCAL
:数据和代码在一个机架上 -
ANY
:数据可能在任意地方,比如网路可达的任意机器
-
- 在
TaskScheduleImpl
提交Task
到Executor
上去执行的时候,Task
在等待一段时间以后,如果Executor
一直没有core
被释放,那么Task
的数据本地化级别就会依次放大一个级别,这个等待时间是可以调整的,这就是一个调优点
- 数据本地化就是说数据与计算它的代码是分开的,那么最后肯定他们俩需要在一起才能完成计算任务,这也样就出现了网络
-
shuffle
性能优化-
spark.shuffle.file.buffer
- 默认值:
32k
- 参数说明:该参数用于设置
shuffle write task
的BufferedOutputStream
的buffer
缓冲大小。将数据写到磁盘文件之前,会先写入buffer
缓冲中,待缓冲写满之后,才会溢写到磁盘。 - 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如
64k
),从而减少shuffle write过程
中溢写磁盘文件的次数,也就可以减少磁盘IO
次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%
的提升。
- 默认值:
-
spark.reducer.maxSizeInFlight
- 默认值:48m
- 参数说明:该参数用于设置
shuffle read task
的buffer
缓冲大小,而这个buffer
缓冲决定了每次能够拉取多少数据。 - 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如
96m
),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%
的提升。
-
spark.shuffle.io.maxRetries
- 默认值:3
- 参数说明:
shuffle read task从shuffle write task
所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。 - 调优建议:对于那些包含了特别耗时的
shuffle
操作的作业,建议增加重试最大次数(比如60
次),以避免由于JVM
的full gc
或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle
过程,调节该参数可以大幅度提升稳定性。
-
spark.shuffle.io.retryWait
- 默认值:
5s
- 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是
5s
。 - 调优建议:建议加大间隔时长(比如
60s
),以增加shuffle操作的稳定性。
- 默认值:
-
spark.shuffle.memoryFraction
- 默认值:
0.2
- 参数说明:该参数代表了
Executor
内存中,分配给shuffle read task
进行聚合操作的内存比例,默认是20%
。 - 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给
shuffle read
的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%
左右。
- 默认值:
-
spark.shuffle.manager
- 默认值:sort
- 参数说明:该参数用于设置
ShuffleManager
的类型。Spark 1.5
以后,有三个可选项:hash
、sort
和tungsten-sort
。HashShuffleManager
是Spark 1.2
以前的默认选项,但是Spark 1.2
以及之后的版本默认都是SortShuffleManager
了。tungsten-sort
与sort
类似,但是使用了tungsten
计划中的堆外内存管理机制,内存使用效率更高。 - 调优建议:由于
SortShuffleManager
默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager
就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass
机制或优化的HashShuffleManager
来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug
。
-
spark.shuffle.sort.bypassMergeThreshold
- 默认值:200
- 参数说明:当
ShuffleManager为SortShuffleManager
时,如果shuffle read task
的数量小于这个阈值(默认是200
),则shuffle write
过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager
的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。 - 调优建议:当你使用
SortShuffleManager
时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task
的数量。那么此时就会自动启用bypass
机制,map-side
就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write
性能有待提高。
-
spark.shuffle.consolidateFiles
- 默认值:false
- 参数说明:如果使用
HashShuffleManager
,该参数有效。如果设置为true
,那么就会开启consolidate
机制,会大幅度合并shuffle write
的输出文件,对于shuffle read task
数量特别多的情况下,这种方法可以极大地减少磁盘IO
开销,提升性能。 - 调优建议:如果的确不需要
SortShuffleManager
的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager
参数手动指定为hash,使用HashShuffleManager
,同时开启consolidate
机制。在实践中尝试过,发现其性能比开启了bypass
机制的SortShuffleManager
要高出10%~30%。
-
- 如果在持久还