数据倾斜

数据倾斜

转载声明

本文大量内容系转载自以下文章,有删改,并参考其他文档资料加入了一些内容:

0x01 前言

1.1 绪论

数据倾斜是大数据领域绕不开的拦路虎,当你所需处理的数据量到达了上亿甚至是千亿条的时候,数据倾斜将是横在你面前一道巨大的坎。

迈的过去,将会海阔天空!迈不过去,就要做好准备:很可能有几周甚至几月都要头疼于数据倾斜导致的各类诡异的问题。

1.2 郑重声明

  • 话题比较大,技术要求也比较高,笔者尽最大的能力来写出自己的理解,写的不对和不好的地方大家一起交流。
  • 有些例子不是特别严谨,一些小细节对文章理解没有影响,不要太在意。(比如我在算机器内存的时候,就不把Hadoop自身的进程算到使用内存中)

1.3 文章结构

  1. 先大致解释一下什么是数据倾斜
  2. 再根据几个场景来描述一下数据倾斜产生的情况
  3. 详细分析一下在Hadoop和Spark中产生数据倾斜的原因
  4. 如何解决(优化)数据倾斜问题?

0x02 什么是数据倾斜

简单的讲,数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了集群中的一台或者几台机器上计算,而集群中的其他节点空闲。这些倾斜了的数据的计算速度远远低于平均计算速度,导致整个计算过程过慢。

2.1 关键字:数据倾斜

相信大部分做数据的童鞋们都会遇到数据倾斜,数据倾斜会发生在数据开发的各个环节中,比如:

  • 用Hive算数据的时候reduce阶段卡在99.99%
  • 用SparkStreaming做实时算法时候,一直会有executor出现OOM的错误,但是其余的executor内存使用率却很低。

这些问题经常会困扰我们,辛辛苦苦等了几个小时的数据就是跑不出来,心里多难过啊。

2.2 关键字:千亿级

为什么要突出这么大数据量?先说一下笔者自己最初对数据量的理解:

数据量大就了不起了?数据量少,机器也少,计算能力也是有限的,因此难度也是一样的。凭什么数据量大就会有数据倾斜,数据量小就没有?

这样理解也有道理,但是比较片面,举两个场景来对比:

  • 公司一:总用户量1000万,5台64G内存的的服务器。
  • 公司二:总用户量10亿,1000台64G内存的服务器。

两个公司都部署了Hadoop集群。假设现在遇到了数据倾斜,发生什么?

  • 公司一的数据分析师在做join的时候发生了数据倾斜,会导致有几百万用户的相关数据集中到了一台服务器上,几百万的用户数据,说大也不大,正常字段量的数据的话64G还是能轻松处理掉的。
  • 公司二的数据分析师在做join的时候也发生了数据倾斜,可能会有1个亿的用户相关数据集中到了一台机器上了(相信我,这很常见)。这时候一台机器就很难搞定了,最后会很难算出结果。

0x03 数据倾斜长什么样

下面会分几个场景来描述一下数据倾斜的特征,方便读者辨别。由于Hadoop和Spark是最常见的两个计算平台,下面就以这两个平台说明:

3.1 Hadoop中的数据倾斜

3.1.1 概述

Hadoop中直接贴近用户使用使用的时Mapreduce程序和Hive程序,虽说Hive最后也是用MR来执行(至少目前Hive内存计算并不普及),但是毕竟写的内容逻辑区别很大,一个是程序,一个是Sql,因此这里稍作区分。

3.1.2 表现

  • Hadoop中的数据倾斜主要表现在、Reduce阶段卡在99.99%,一直不能结束。

  • 这里如果详细的看日志或者和监控界面的话会发现:

    • 有一个多几个Reduce卡住
    • 各种container报错OOM
    • 异常的Reducer读写的数据量极大,至少远远超过其它正常的Reducer
  • 伴随着数据倾斜,会出现任务被kill等各种诡异的表现。

3.1.2 经验

Hive的数据倾斜,一般都发生在Sql中group byjoin on上,而且和数据逻辑绑定比较深。

3.2 Spark中的数据倾斜

Spark中的数据倾斜也很常见,这里包括Spark Streaming和Spark Sql,表现主要有下面几种:

  • Executor lost,OOM,Shuffle过程出错
  • Driver OOM
  • 单个Executor执行时间特别久,整体任务卡在某个阶段不能结束
  • 正常运行的任务突然失败

注意,在Spark streaming程序中,数据倾斜更容易出现,特别是在程序中包含一些类似sql的join、group这种操作的时候。 因为Spark Streaming程序在运行的时候,我们一般不会分配特别多的内存,因此一旦在这个过程中出现一些数据倾斜,就十分容易造成OOM

0x04 数据倾斜的原理

4.1 数据倾斜产生原因概述

我们以Spark和Hive的使用场景为例。

他们在做数据运算的时候会设计到,count distinctgroup byjoin on等操作,这些都会触发Shuffle动作。一旦触发Shuffle,所有相同key的值就会被拉到一个或几个Reducer节点上,容易发生单点计算问题,导致数据倾斜。

4.2 Shuffle与数据倾斜

Shuffle是一个能产生奇迹的地方,不管是在Spark还是Hadoop中,它们的作用都是至关重要的。关于Shuffle的原理,这里不再讲述,看看Hadoop相关的论文或者文章理解一下就ok。这里主要针对,在Shuffle如何产生了数据倾斜。

Hadoop和Spark在Shuffle过程中产生数据倾斜的原理基本类似。如下图:

数据倾斜
大部分数据倾斜的原理就类似于下图,很明了,因为数据分布不均匀,导致大量的数据分配到了一个节点。

4.3 数据本身与数据倾斜

我们举一个例子,就说数据默认值的设计吧,假设我们有两张表:

  • user(用户信息表):userid,register_ip
  • ip(IP表):ip,register_user_cnt

这可能是两个不同的人开发的数据表。如果我们的数据规范不太完善的话,会出现一种情况:

  • user表中的register_ip字段,如果获取不到这个信息,我们默认为null;
  • 但是在ip表中,我们在统计这个值的时候,为了方便,我们把获取不到ip的用户,统一认为他们的ip为0。

两边其实都没有错的,但是一旦我们做关联了,这个任务会在做关联的阶段,也就是sql的on的阶段卡死。

4.4 业务逻辑与数据倾斜

数据往往和业务是强相关的,业务的场景直接影响到了数据的分布。

再举一个例子,比如就说订单场景吧,我们在某一天在北京和上海两个城市多了强力的推广,结果可能是这两个城市的订单量增长了10000%,其余城市的数据量不变。

然后我们要统计不同城市的订单情况,这样,一做group操作,可能直接就数据倾斜了。

0x05 解决数据倾斜思路

5.1 概述

数据倾斜的产生是有一些讨论的,解决它们也是有一些讨论的,本章会先给出几个解决数据倾斜的思路,然后对Hadoop和Spark分别给出一些解决数据倾斜的方案。

注意: 很多数据倾斜的问题,都可以用和平台无关的方式解决,比如更好的数据预处理, 异常值的过滤等,因此笔者认为,解决数据倾斜的重点在于对数据设计和业务的理解,这两个搞清楚了,数据倾斜就解决了大部分了。

5.2 解决思路

解决数据倾斜有这几个思路:

5.2.1 业务逻辑

我们从业务逻辑的层面上来优化数据倾斜,比如上面的两个城市做推广活动导致那两个城市数据量激增的例子,我们可以单独对这两个城市来做count,最后和其它城市做整合。

5.2.2 程序层面

比如说在Hive中,经常遇到count(distinct)操作,这样会导致最终只有一个reduce任务。

我们可以先group,再在外面包一层count,就可以了。

5.2.3 调参方面

Hadoop和Spark都自带了很多的参数和机制来调节数据倾斜,合理利用它们就能解决大部分问题。

5.3 从业务和数据上解决数据倾斜

很多数据倾斜都是在数据的使用上造成的。我们举几个场景,并分别给出它们的解决方案。

数据分布不均匀:

前面提到的“从数据角度来理解数据倾斜”和“从业务计角度来理解数据倾斜”中的例子,其实都是数据分布不均匀的类型,这种情况和计算平台无关,我们能通过设计的角度尝试解决它。

  • 有损的方法:
    • 找到异常数据,比如ip为0的数据,过滤掉
  • 无损的方法:
    • 对分布不均匀的数据,单独计算
    • 先对key做一层hash,先将数据随机打散让它的并行度变大,再汇集
  • 数据预处理

0x06 解决数据倾斜具体方法

6.1 MapReduce

6.1.1 大量相同key没有combine就传到Reducer

  • combiner函数
    • 思想:提前在map进行combine,减少传输的数据量

    • 在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。

      如果导致数据倾斜的key 大量分布在不同的mapper的时候,这种方法就不是很有效了。

6.1.2 导致数据倾斜的key 大量分布在不同的mapper

  • 局部聚合加全局聚合。

    • 思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理
    • 该方法进行两次mapreduce:
      • 第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。
      • 第二次mapreduce,去掉key的随机前缀,进行全局聚合。
    • 这个方法进行两次mapreduce,性能稍差。
  • 增加Reducer
    思想:增加Reducer,提升并行度

  • 实现custom partitioner
    思想:根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer

6.2 Hive

6.2.1 group by

注:group by 优于distinct group

  • 情形:group by 维度过小,某值的数量过多
  • 后果:处理某值的reduce非常耗时

6.2.1 count(distinct)

distinct count(distinct xx)

  • 情形:某特殊值过多
  • 后果:处理此特殊值的reduce耗时

6.2.3 join

  • 情形1:小表与大表join,但较小表key集中
    后果:shuffle分发到某一个或几个Reducer上的数据量远高于平均值

  • 情形2:大表与大表join,但是分桶的判断字段0值或空值过多
    后果:这些空值都由一个Reducer处理,非常慢

6.3 Spark

5.4 Hadoop

  • mapjoin
  • count distinct的操作,先转成group,再count
  • 万能膏药:hive.groupby.skewindata=true
  • left semi jioin的使用
  • 设置map端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了IO读写和网络传输,能提高很多效率)

5.5 Spark

  • mapjoin
  • 设置rdd压缩
  • 合理设置driver的内存
  • Spark Sql中的优化和Hive类似,可以参考Hive

0xFF 总结

数据倾斜的坑还是很大的,如何处理数据倾斜是一个长期的过程,希望本文的一些思路能提供帮助。