spark学习(五):shuffle以及内存管理机制
目录
1.2 Shuffle分为shuffle和sortShuffle
1. shuffle详解
之前说RDD宽窄依赖,划分stage时说到一个RDD是否有多个子RDD判断其是窄依赖还是宽依赖,这其中宽依赖:一个RDD有多个子RDD,这个过程中有shuffle过程;
Stage中pipeline计算模型时提到数据什么时候落地:shuffle或者checkpoint
1.1 那么到底什么时shufffle?
我们之前说到reduceByKey会有shuffle过程,那我们从reduceByKey为例开始
reduceByKey的含义?
reduceByKey会将一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key, value>的形式,这样每一个key对应一个聚合起来的value
问题:
每一个key对应的value不一定都在一个partition中,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,他的partition很可能分布在各个节点上
如何聚合?
- Shuffle Write:上一个stage的每个map task就必须保证将自己处理的当前分区中的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件
- Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key多对应的value都会汇聚到同一个节点上去处理和聚合
1.2 Shuffle分为shuffle和sortShuffle
1.2.1 shuffle普通机制
shuffle 普通机制运行原理图
一个executor中的每个task都有属于自己的buffer缓冲区
执行流程
- Map task 处理完数据后,将结果写入到buffer缓冲区(与reduce task数量一致),每个buffer默认32k
- 满了之后溢写到磁盘,每个buffer对应一个磁盘小文件。
- Reduce task到不同的节点去拉取数据自己的数据
产生的小文件的个数:M(map task) * R(reduce task )
问题:
产生磁盘小文件太多
- 写磁盘文件的对象多
- 拉取数据读磁盘文件对象多
- 创建对象多,容易造成gc,gc还不满足内存使用,就会OOM
OOM问题:
- Driver端回收RDD数据
- Executor 端创建对象非常多,可能会有OOM(0.2 task内存)
- Executor 端拉取shuffle数据,如果5个task一次拉取的数据量在Executor0.2的shuffle内存中放不下
- Executor端对RDD进行缓存或者广播变量的RDD数据量比较大(0.6内存)
怎么优化解决问题?
1.2.2 shuffle合并机制
优化后的HashShuffleManager 运行原理图
一个executor中的task共享一个buffer缓冲区
产生的小文件的个数:C(core) * R(reduce task)
其他的和普通机制一样
1.2.3 SortShuffle普通运行机制
SortShuffleManager普通运行机制原理图
执行流程:
- map task 将处理的结果写入一个5M的内存结构中
- SortShuffle中会估算这个内存结构大小,当下一次结果放不下时,会申请2*估计-当前
- 如果申请的到内存,继续往数据结构中写数据,如果申请不到,溢写磁盘,每批次是1万条溢写,溢写过程中会有排序。
- 溢写的数据在磁盘上最终形成两个文件:一个索引文件一个数据文件
- reduce 拉取数据首先解析索引文件,再去拉取数据
产生的小文件的个数 2*M(map task)
SortShuffleManager bypass运行机制原理图
产生的小文件个数 2 * M(map task)
与普通运行机制相比,少了排序功能;
Bypass运行机制的处罚条件:当Shuffle reduce task个数小于spark.shuffle.sort.bypassMergeThreshold (默认200)参数的值,会开启bypass机制。
1.3 shuffle文件寻址
- map task处理完的数据,将结果和数据位置封装到MapStatus对象中,通过MapOutputTrackerWorker汇报给Driver中的MapOutputTrackerMaster。Driver中掌握了数据位置。
- reduce 端处理数据,首先向本进程中的MapOutputTrackerWorker要磁盘文件位置,再向Driver中的MapOutputTrackerMaster要磁盘数据位置,Driver返回磁盘数据位置。
- reduce 拿到数据位置之后,通过BlockManager中的ConnectionManager连接数据所在的节点,连接上之后,通过BlockManager中的BlockTransferService拉取数据
- BlockTransferService拉取数据默认启动5个task,这5个task默认一次拉取的数据量不能超过48M。
- 拉取过来的数据放在Executor端的shuffle聚合内存中(spark.shuffle.memeoryFraction 0.2)。
- 如果5个task一次拉取的数据放不到shuffle内存中会有OOM,如果放下一次,不会有OOM,以后放不下的会放磁盘。
shuffle过程中的一些参数:
参照官网:http://spark.apache.org/docs/2.2.0/configuration.html#shuffle-behavior
MapOutputTracker:磁盘小文件
MapOutputTrackerMaster(Driver端)
MapOutputTrackerWorker(Executor端)
BlockManager:块管理者
BlockManagerMaster(Driver端):
DiskStore:管理磁盘数据
MemoryStroe:管理内存数据
ConnectionManager:负责连接其他BlockManager
BlockTransferService:负责拉取数据
BlockManagerSlaves(Executor端):
DiskStore:管理磁盘数据
MemoryStroe:管理内存数据
ConnectionManager:负责连接其他BlockManager
BlockTransferService:负责拉取数据
寻址过程:
1.4 shuffle调优
shuffle参数调优 提取码:hym5
或者访问该博客:https://www.cnblogs.com/arachis/p/Spark_Shuffle.html
2. spark的内存管理机制
2.1 静态内存管理机制
在spark1.6之前,使用静态管理机制
图片来源于:https://www.cnblogs.com/wzj4858/p/8204282.html
静态管理机制--堆内
静态内存管理图示——堆外
2.2 统一内存管理机制
在spark1.6之后,使用统一内存管理机制
统一内存管理--堆内
统一内存管理--堆外
内存分为3部分:
storage:缓存 60% * 50%
execution:shuffle,join等运行 60% * 50%
other: spark内部的数据运行; 保护oom 40%
动态占用机制
- 如果双方的内存都是要完了,直接溢出磁盘。
- Storage占用的execution的内存,可以被Execution剔除。
- execution占用了Storage的内存,不能被剔除,直到exection占用的内存释放掉。
collect方法,如果数据量太大,直接报错OOM。
2.3 spark关于内存分配的参数
官方网站:http://spark.apache.org/docs/2.2.0/configuration.html#memory-management