spark之groupByKey与reduceByKey
一. groupByKey
默认的HashPartitioner:key的hashcode % 分区数量
package com.weiyi.spark.batch
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(WordCount.getClass.getSimpleName)
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("d:\\people.txt")
val splitData: RDD[String] = lines.flatMap(_.split(" "))
val mapData: RDD[(String, Int)] = splitData.map((_,1))
val value: RDD[(String, Iterable[Int])] = mapData.groupByKey()
val result: RDD[(String, Int)] = value.mapValues(_.sum)
result.foreach(println)
//释放资源
sc.stop()
}
}
二. reduceByKey
默认的HashPartitioner:key的hashcode % 分区数量
但是会在分区内进行聚合
package com.weiyi.spark.batch
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(WordCount.getClass.getSimpleName)
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("d:\\people.txt")
val splitData: RDD[String] = lines.flatMap(_.split(" "))
val mapData: RDD[(String, Int)] = splitData.map((_,1))
val reducedData: RDD[(String, Int)] = mapData.reduceByKey(_+_)
reducedData.foreach(println)
//释放资源
sc.stop()
}
}