spark之groupByKey与reduceByKey

一. groupByKey

默认的HashPartitioner:key的hashcode % 分区数量  

spark之groupByKey与reduceByKey

package com.weiyi.spark.batch

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName(WordCount.getClass.getSimpleName)
    val sc: SparkContext = new SparkContext(conf)
    val lines: RDD[String] = sc.textFile("d:\\people.txt")
    val splitData: RDD[String] = lines.flatMap(_.split(" "))
    val mapData: RDD[(String, Int)] = splitData.map((_,1))
    val value: RDD[(String, Iterable[Int])] = mapData.groupByKey()
    val result: RDD[(String, Int)] = value.mapValues(_.sum)
    result.foreach(println)
    //释放资源
    sc.stop()
  }

}

 

二. reduceByKey

默认的HashPartitioner:key的hashcode % 分区数量  

但是会在分区内进行聚合

spark之groupByKey与reduceByKey

package com.weiyi.spark.batch

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName(WordCount.getClass.getSimpleName)
    val sc: SparkContext = new SparkContext(conf)
    val lines: RDD[String] = sc.textFile("d:\\people.txt")
    val splitData: RDD[String] = lines.flatMap(_.split(" "))
    val mapData: RDD[(String, Int)] = splitData.map((_,1))
    val reducedData: RDD[(String, Int)] = mapData.reduceByKey(_+_)
    reducedData.foreach(println)
    //释放资源
    sc.stop()
  }

}