史上最简单的spark教程第六章-键值对RDD聚合,分组,统计的Java操作案例-(上集)
第六章:键值对RDD的操作
- 键值对RDD的操作用途:
- 聚合,统计,分组
史上最简单的spark教程
所有代码示例地址:https://github.com/Mydreamandreality/sparkResearch
(提前声明:文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章,写作不易,转载请注明)
(文章参考:Elasticsearch权威指南,Spark快速大数据分析文档,Elasticsearch官方文档,实际项目中的应用场景)
(帮到到您请点点关注,文章持续更新中!)
Git主页 https://github.com/Mydreamandreality
- 键值对的数据一般需要经过ETL,最终转换成我们需要的数据
- spark对键值对的RDD提供了一些新的接口,pairRDD
- 提供并行操作各个键或跨节点重新进行数据分组的操作接口
- 下面都是针对pairRDD的操作案例
现在这里列举一下pairRDD的常用函数,结合java案例代码很好理解
操作数据:(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)
大概了解之后我们就可以直接看java代码的案例了
pair RDD(键值对)的转化操作
代码示例:
public static void run(JavaSparkContext sparkContext) {
JavaRDD<String> rdd = sparkContext.parallelize(Arrays.asList("test", "java", "python"));
//把RDD的第一个字符当做Key
PairFunction<String, String, String> pairFunction = new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<>(s.split(" ")[0], s);
}
};
//此处创建好pairRDD
JavaPairRDD<String, String> pairRdd = rdd.mapToPair(pairFunction);
//下层都是对pairRDD的操作演示
/*合并含有相同键的值*/
pairRdd.reduceByKey(new Function2<String, String, String>() {
@Override
public String call(String v1, String v2) throws Exception {
return v1 + v2;
}
});
/*相同key的元素进行分组*/
pairRdd.groupByKey();
/*对pair中的每个值进行应用*/
pairRdd.mapValues(new Function<String, Object>() {
@Override
public Object call(String v1) throws Exception {
return v1 + "sirZ";
}
});
/*返回只包含键的RDD*/
pairRdd.keys();
/*返回只包含值的RDD*/
pairRdd.values();
/*返回根据键排序的RDD*/
pairRdd.sortByKey();
}
上面的案例是针对单个RDD元素进行操作
这里就针对两个RDD进行操作 (可以理解为MySQL表连接?我不知道是否可以这么理解)
代码示例
/**
* Created by 張燿峰
* pairRDD入门案例
*
* @author 孤
* @date 2019/3/19
* @Varsion 1.0
*/
public class PairRdd {
public static void run(JavaSparkContext sparkContext) {
JavaRDD<String> rdd = sparkContext.parallelize(Arrays.asList("test", "java", "python"));
PairFunction<String, String, String> pairFunction = new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<>(s.split(" ")[0], s);
}
};
//此处创建好pairRDD
JavaPairRDD<String, String> pairRdd = rdd.mapToPair(pairFunction);
//下层都是对pairRDD的操作演示
/*合并含有相同键的值*/
pairRdd.reduceByKey(new Function2<String, String, String>() {
@Override
public String call(String v1, String v2) throws Exception {
return v1 + v2;
}
});
/*相同key的元素进行分组*/
pairRdd.groupByKey();
/*对pair中的每个值进行应用*/
pairRdd.mapValues(new Function<String, Object>() {
@Override
public Object call(String v1) throws Exception {
return v1 + "sirZ";
}
});
/*返回只包含键的RDD*/
pairRdd.keys();
/*返回只包含值的RDD*/
pairRdd.values();
/*返回根据键排序的RDD*/
pairRdd.sortByKey();
}
/*针对多个pairRDD元素的操作*/
public static void runPair(JavaSparkContext sparkContext) {
JavaRDD<String> rdd = sparkContext.parallelize(Arrays.asList("test", "java", "python"));
JavaRDD<String> otherRDD = sparkContext.parallelize(Arrays.asList("golang", "php", "hadoop"));
PairFunction<String, String, String> pairFunction = new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
return new Tuple2<>(s.split(" ")[0], s);
}
};
JavaPairRDD<String, String> pairRDD = rdd.mapToPair(pairFunction);
JavaPairRDD<String, String> pairRDDOther = otherRDD.mapToPair(pairFunction);
//创建好两个PairRDD之后开始操作
//删除 ==pairRDD== 中键与pairRDDOther相同的元素
JavaPairRDD<String, String> subRDD = pairRDD.subtractByKey(pairRDDOther);
//内连接 inner join 查询
JavaPairRDD<String, Tuple2<String, String>> jsonRDD = pairRDD.join(pairRDDOther);
//右连接 right join 查询 //TODO 此处我理解是可以为null的二元组
JavaPairRDD<String, Tuple2<Optional<String>, String>> rightRDD = pairRDD.rightOuterJoin(pairRDDOther);
//左连接 left join 查询
JavaPairRDD<String, Tuple2<String, Optional<String>>> leftRDD = pairRDD.leftOuterJoin(pairRDDOther);
//将两个RDD中有相同键的数据分组 //TODO 此处我理解是迭代器
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<String>>> groupRDD = pairRDD.cogroup(pairRDDOther);
}
以上就是spark键值对的操作简单案例
首先创建pariRDD(键值对RDD),然后对创建好的PairRDD进行操作
具体的使用看代码中注释,后续还会有更深入的案例
此处需要注意一下: PairRDD也还是RDD组,同样支持RDD所支持的函数
代码案例:
//pairRDD也可以使用RDD的函数
//筛选length小于20的元素
Function<Tuple2<String,String>,Boolean> filterRDD = new Function<Tuple2<String, String>, Boolean>() {
@Override
public Boolean call(Tuple2<String, String> v1) {
return (v1._2.length()<20);
}
};
JavaPairRDD<String,String> filter = pairRDD.filter(filterRDD);
聚合操作
当数据集以键值对形式组织的时候,聚合具有相同键的元素进行一些统计是很常见的操作,之前说过基础 RDD上的 fold(),combine(),reduce() 等行动操作,pair RDD 上则有相应的针对键的转化操作.Spark 有一组类似的操作,可以组合具有相同键的值,这些操作返回 RDD,因此它们是转化操作而不是行动操作
代码案例
JavaRDD<String> wordCount = sparkContext.parallelize(Arrays.asList("1", "2", "3", "4", "5"));
//返回一个可以迭代的集合
JavaRDD<String> c = wordCount.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String v1) throws Exception {
return Arrays.asList(v1.split(" ")).iterator();
}
});
//现在的数据是 1,2,3,4,5
JavaPairRDD<String, Integer> result = c.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
//此时的数据是 {1,1},{2,1},{3,1},{4,1}...
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
/*还可以通过countByValue快速实现单词计数*/
c.countByValue();
总结
pairRDD(键值对RDD)
有很多函数可以进行基于键的数据合并,它们中的大多数都是在 combineByKey() 的基础上实现的,为用户提供了更简单的接口
combineByKey()的工作原理如下:
本人纯手工画的,见谅 [狗头]