144、Spark核心编程进阶之aggregateByKey以及单词计数案例
reduceByKey可以认为是aggregateByKey的简化版
aggregateByKey,分为三个参数,,多提供了一个函数,Seq Function
就是说自己可以控制如何对每个partition中的数据进行先聚合,类似于mapreduce中的,map-side combine
然后才是对所有partition中的数据进行全局聚合
第一个参数是,每个key的初始值
第二个是个函数,Seq Function,如何进行shuffle map-side的本地聚合
第三个是个函数,Combiner Function,如何进行shuffle reduce-side的全局聚合
aggregateByKey原理.png
public class AggregateByKey {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("AggregateByKeyJava").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<String> linesRDD = sparkContext.textFile("E:\\testdata\\wordcount\\input\\1.txt", 3);
JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
JavaPairRDD<String, Integer> wordNumbersRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairRDD<String, Integer> resultRDD = wordNumbersRDD.aggregateByKey(0, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
resultRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2._1 + " appears " + stringIntegerTuple2._2 + " times");
}
});
}
}