Spark(java)二次排序

#楔子

学习《数据算法 Hadoop Spark大数据处理技巧》中Spark部分

1 二次排序:简介

​ 二次排序(secondary sort) 问题是指归约阶段与某个键关联的值排序。有时这也称为值键转换(value-key conversion)。利用二次排序技术,可以传入各个归约器的值完成排序。

二次排序问题解决方案

​ 归约器值排序至少有2种方案。这些使用hadoop和Spark都可以应用。

  • 第一种:让归约器读取和缓存给定键的所有值,然后对这些值完成一个归约器中排序。这种方法不具有可伸缩性,因为归约器要接收一个给定键的所有值。这种方法可能导致归约器耗尽内存,另一方面,如果数量很少,不会导致内存溢出错误,那么这种方法是适用的。
  • 第二种:使用Mapreduce框架对归约器值排序。这种方法“会为自然键增加或整个值来创建一个组合键完成归约器中排序”。这种方法可伸缩,不会内存溢出。

实现细节


​ 要实现二次排序,还需要一个Java插件类。我们要告诉Mapreduce/Hadoop框架:

  1. 如何对归约器键排序
  2. 如何对传入归约器的键分区
  3. 如何对到达各个归约器的数据分组

Spark的二次排序解决方案

在Spark中解决二次排序问题,至少要2种方案:

方案1:

​ 将一个给定键的所有值读取并缓存到一个List数组结构中,然后对这些值完成归约器中排序。如果每个归约器键的值集很小(完全可以放在内存中)。这种是可以的

方案2:

​ 使用Spark框架对归约器值排序(这种做法不需要对传入归约器的值完成归约器中排序)。这种方法"会为自然键增加部分或整个值来创建一个组合键以实现排序目标"。这种方法是可伸缩的。

Spark(java)二次排序

方案一:内存中实现二次排序


​ 由于Spark有一个非常强大的高层API,这里将用一个Java类提供整个解决方案。SparkAPI 建立在弹性分布式数据集基本抽象概念基础上,要充分利用Spark API,首先必须理解RDD,RDD类包含了所有RDD可用的基本MapReduce操作。JavaPairRDD<k,v>类则包含了mapTopPair、flatMapToPair和groupByKey等MapReduce操作。

步骤

  1. 导入需要的包
  2. 输入数据并验证(我操作时数据规则,就不做验证了)
  3. 创建JavaSparkContext对象。
  4. 使用JavaSparkContext为上下文创建一个RDD。得到的RDD将是JavaRDD ,这个rdd各个元素分别是name,time,value
  5. 从javaRDD创建键值对,键是name,值是(time,value)。得到新的RDD
  6. 按照键(name)对JavaPairDD元素分组
  7. 对归约器值排序得到最终输出

数据

x 2 9
y 2 5
x 1 3
y 1 7
y 3 1
x 3 6
z 1 4
z 2 8
z 3 7
z 4 0
p 2 6
p 4 7
p 1 9
p 6 0
p 7 3
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;

import com.google.common.collect.Lists;
import com.zhuzi.utils.SparkUtils;

/**
 * @Title: SortBySpark.java
 * @Package com.zhuzi.bookj.char01
 * @Description: TODO(spark排序 方案1 内存中实现二次排序)
 * @author 作者 grq
 * @version 创建时间:2018年11月28日 下午1:58:39
 *
 */
public class SortBySpark {
	@SuppressWarnings("serial")
	public static void main(String[] args) {
		String filePath = SparkUtils.getFilePath("data/j/char01_sort.txt");

		// 步骤3 创建JavaSparkContext 对象
		JavaSparkContext sparkContext = SparkUtils.getJavaSparkContext();
		// 步骤4 使用JavaSparkContext为上下文创建一个RDD。得到的RDD将是JavaRDD<String>
		// ,这个rdd各个元素分别是name,time,valu
		JavaRDD<String> lines = sparkContext.textFile(filePath, 1);
		for (String string : lines.collect()) {
			// System.out.println(string);

		}
		// 步骤5 从javaRDD创建键值对,键是name,值是(time,value)。得到新的RDD(为了方便我直接使用字符串)
		JavaPairRDD<String, Tuple2<String, String>> pairRDD = lines.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
			@Override
			public Tuple2<String, Tuple2<String, String>> call(String s) throws Exception {
				String[] tokens = s.split(" ");
				Tuple2<String, String> value = new Tuple2<String, String>(tokens[1], tokens[2]);
				return new Tuple2<String, Tuple2<String, String>>(tokens[0], value);
			}
		});
		for (Tuple2<String, Tuple2<String, String>> pairTemp : pairRDD.collect()) {
			// System.out.println(pairTemp);
		}

		// 步骤 6 按照键(name)对JavaPairDD元素分组

		JavaPairRDD<String, Iterable<Tuple2<String, String>>> groupByKey = pairRDD.groupByKey().sortByKey();
		for (JavaPairRDD<String, Iterable<Tuple2<String, String>>> temp : Lists.newArrayList(groupByKey)) {
			List<Tuple2<String, Iterable<Tuple2<String, String>>>> collect = temp.collect();
			for (Tuple2<String, Iterable<Tuple2<String, String>>> tuple2 : collect) {
				System.out.println("------step 6--->" + tuple2);
			}
		}
		// (z,[(1,4), (2,8), (3,7), (4,0)])
		// (p,[(2,6), (4,7), (1,9), (6,0), (7,3)])
		// (x,[(2,9), (1,3), (3,6)])
		// (y,[(2,5), (1,7), (3,1)])

		// 步骤7 在内存中对归约器值排序

		groupByKey.persist(StorageLevel.MEMORY_ONLY());
		JavaPairRDD<String, Iterable<Tuple2<String, String>>> soreResult = groupByKey.mapValues(new Function<Iterable<Tuple2<String, String>>, // 输入
				Iterable<Tuple2<String, String>// 输出
				>>() {
					@Override
					public Iterable<Tuple2<String, String>> call(Iterable<Tuple2<String, String>> v1) throws Exception {
						List<Tuple2<String, String>> newList = Lists.newArrayList(v1);
						Collections.sort(newList, new Tuplecompatrtor());
						return newList;
					}
				});

		List<Tuple2<String, Iterable<Tuple2<String, String>>>> collect = soreResult.collect();
		for (Tuple2<String, Iterable<Tuple2<String, String>>> temp : collect) {
			ArrayList<Tuple2<String, String>> newArrayList = Lists.newArrayList(temp._2());
			System.out.print(temp._1() + "=>[");
			for (Tuple2<String, String> temp2 : newArrayList) {
				System.out.print(temp2._2() + "  ");
			}
			System.out.println("]");
		}
		// p=>[9 6 7 0 3 ]
		// x=>[3 9 6 ]
		// y=>[7 5 1 ]
		// z=>[4 8 7 0 ]
		sparkContext.close();
		System.exit(0);
	}
}

public class Tuplecompatrtor implements Comparator<Tuple2<String, String>> {

	@Override
	public int compare(Tuple2<String, String> o1, Tuple2<String, String> o2) {

		return o1._1.compareTo(o2._1);
	}
}

方案2:使用Spark框架实现二次排序


​ 方案1中,使用的是内存中(Collections.sort())对归约器值排序。如果归约器值无法完全放入内存。这种方案就不具有可伸缩性。如果一个按自然键(name)分区的分区器,能保留RDD的顺序,这将是一种可行的方案。

后记

目前还是使用的是list集合的排序。对于大数据不合适,其他有待继续学习。