Spark(java)二次排序
#楔子
学习《数据算法 Hadoop Spark大数据处理技巧》中Spark部分
1 二次排序:简介
二次排序(secondary sort) 问题是指归约阶段与某个键关联的值排序。有时这也称为值键转换(value-key conversion)。利用二次排序技术,可以传入各个归约器的值完成排序。
二次排序问题解决方案
归约器值排序至少有2种方案。这些使用hadoop和Spark都可以应用。
- 第一种:让归约器读取和缓存给定键的所有值,然后对这些值完成一个归约器中排序。这种方法不具有可伸缩性,因为归约器要接收一个给定键的所有值。这种方法可能导致归约器耗尽内存,另一方面,如果数量很少,不会导致内存溢出错误,那么这种方法是适用的。
- 第二种:使用Mapreduce框架对归约器值排序。这种方法“会为自然键增加或整个值来创建一个组合键完成归约器中排序”。这种方法可伸缩,不会内存溢出。
实现细节
要实现二次排序,还需要一个Java插件类。我们要告诉Mapreduce/Hadoop框架:
- 如何对归约器键排序
- 如何对传入归约器的键分区
- 如何对到达各个归约器的数据分组
Spark的二次排序解决方案
在Spark中解决二次排序问题,至少要2种方案:
方案1:
将一个给定键的所有值读取并缓存到一个List数组结构中,然后对这些值完成归约器中排序。如果每个归约器键的值集很小(完全可以放在内存中)。这种是可以的
方案2:
使用Spark框架对归约器值排序(这种做法不需要对传入归约器的值完成归约器中排序)。这种方法"会为自然键增加部分或整个值来创建一个组合键以实现排序目标"。这种方法是可伸缩的。
方案一:内存中实现二次排序
由于Spark有一个非常强大的高层API,这里将用一个Java类提供整个解决方案。SparkAPI 建立在弹性分布式数据集基本抽象概念基础上,要充分利用Spark API,首先必须理解RDD,RDD类包含了所有RDD可用的基本MapReduce操作。JavaPairRDD<k,v>类则包含了mapTopPair、flatMapToPair和groupByKey等MapReduce操作。
步骤
- 导入需要的包
- 输入数据并验证(我操作时数据规则,就不做验证了)
- 创建JavaSparkContext对象。
- 使用JavaSparkContext为上下文创建一个RDD。得到的RDD将是JavaRDD ,这个rdd各个元素分别是name,time,value
- 从javaRDD创建键值对,键是name,值是(time,value)。得到新的RDD
- 按照键(name)对JavaPairDD元素分组
- 对归约器值排序得到最终输出
数据
x 2 9
y 2 5
x 1 3
y 1 7
y 3 1
x 3 6
z 1 4
z 2 8
z 3 7
z 4 0
p 2 6
p 4 7
p 1 9
p 6 0
p 7 3
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import com.google.common.collect.Lists;
import com.zhuzi.utils.SparkUtils;
/**
* @Title: SortBySpark.java
* @Package com.zhuzi.bookj.char01
* @Description: TODO(spark排序 方案1 内存中实现二次排序)
* @author 作者 grq
* @version 创建时间:2018年11月28日 下午1:58:39
*
*/
public class SortBySpark {
@SuppressWarnings("serial")
public static void main(String[] args) {
String filePath = SparkUtils.getFilePath("data/j/char01_sort.txt");
// 步骤3 创建JavaSparkContext 对象
JavaSparkContext sparkContext = SparkUtils.getJavaSparkContext();
// 步骤4 使用JavaSparkContext为上下文创建一个RDD。得到的RDD将是JavaRDD<String>
// ,这个rdd各个元素分别是name,time,valu
JavaRDD<String> lines = sparkContext.textFile(filePath, 1);
for (String string : lines.collect()) {
// System.out.println(string);
}
// 步骤5 从javaRDD创建键值对,键是name,值是(time,value)。得到新的RDD(为了方便我直接使用字符串)
JavaPairRDD<String, Tuple2<String, String>> pairRDD = lines.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
@Override
public Tuple2<String, Tuple2<String, String>> call(String s) throws Exception {
String[] tokens = s.split(" ");
Tuple2<String, String> value = new Tuple2<String, String>(tokens[1], tokens[2]);
return new Tuple2<String, Tuple2<String, String>>(tokens[0], value);
}
});
for (Tuple2<String, Tuple2<String, String>> pairTemp : pairRDD.collect()) {
// System.out.println(pairTemp);
}
// 步骤 6 按照键(name)对JavaPairDD元素分组
JavaPairRDD<String, Iterable<Tuple2<String, String>>> groupByKey = pairRDD.groupByKey().sortByKey();
for (JavaPairRDD<String, Iterable<Tuple2<String, String>>> temp : Lists.newArrayList(groupByKey)) {
List<Tuple2<String, Iterable<Tuple2<String, String>>>> collect = temp.collect();
for (Tuple2<String, Iterable<Tuple2<String, String>>> tuple2 : collect) {
System.out.println("------step 6--->" + tuple2);
}
}
// (z,[(1,4), (2,8), (3,7), (4,0)])
// (p,[(2,6), (4,7), (1,9), (6,0), (7,3)])
// (x,[(2,9), (1,3), (3,6)])
// (y,[(2,5), (1,7), (3,1)])
// 步骤7 在内存中对归约器值排序
groupByKey.persist(StorageLevel.MEMORY_ONLY());
JavaPairRDD<String, Iterable<Tuple2<String, String>>> soreResult = groupByKey.mapValues(new Function<Iterable<Tuple2<String, String>>, // 输入
Iterable<Tuple2<String, String>// 输出
>>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterable<Tuple2<String, String>> v1) throws Exception {
List<Tuple2<String, String>> newList = Lists.newArrayList(v1);
Collections.sort(newList, new Tuplecompatrtor());
return newList;
}
});
List<Tuple2<String, Iterable<Tuple2<String, String>>>> collect = soreResult.collect();
for (Tuple2<String, Iterable<Tuple2<String, String>>> temp : collect) {
ArrayList<Tuple2<String, String>> newArrayList = Lists.newArrayList(temp._2());
System.out.print(temp._1() + "=>[");
for (Tuple2<String, String> temp2 : newArrayList) {
System.out.print(temp2._2() + " ");
}
System.out.println("]");
}
// p=>[9 6 7 0 3 ]
// x=>[3 9 6 ]
// y=>[7 5 1 ]
// z=>[4 8 7 0 ]
sparkContext.close();
System.exit(0);
}
}
public class Tuplecompatrtor implements Comparator<Tuple2<String, String>> {
@Override
public int compare(Tuple2<String, String> o1, Tuple2<String, String> o2) {
return o1._1.compareTo(o2._1);
}
}
方案2:使用Spark框架实现二次排序
方案1中,使用的是内存中(Collections.sort())对归约器值排序。如果归约器值无法完全放入内存。这种方案就不具有可伸缩性。如果一个按自然键(name)分区的分区器,能保留RDD的顺序,这将是一种可行的方案。
后记
目前还是使用的是list集合的排序。对于大数据不合适,其他有待继续学习。