关于RDD分区(一)

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上,作用有二:增加并行度和减少通信开销(连接操作),例如下图:

关于RDD分区(一)

RDD分区原则:

RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目
对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:
*本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N,local[*]则自动判断
*Apache Mesos:默认的分区数为8
*Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值

设置分区的个数的方法:

  • 创建RDD时手动指定分区个数
  • 使用reparititon方法重新设置分区个数

自定义分区方法:

Spark提供了自带的HashPartitioner(哈希分区)与RangePartitioner(区域分区),能够满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过提供一个自定义的Partitioner对象来控制RDD的分区方式,从而利用领域知识进一步减少通信开销。

//自定义分区类
class MyPartitioner(numParts:Int) extends Partitioner{
  override def getPartition(key: Any): Int = {
    key.toString().toInt % 10
  }
  override def numPartitions: Int = numParts

}

object TestPartitioner {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Partitioner").setMaster("local[1]")
    val sc = new SparkContext(conf)
    //模拟5个分区的数据
    val data = sc.parallelize(1 to 10, 5)
    //根据数据的尾号变为10个分区,分别写到10个文件中
    data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("E:\\tmp\\spark\\testpartitioner")
  }
}

那么问题来了:改变分区数会影响集群的效率吗?