spark
1)map操作
/**
* map(func):返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
* 将原始集合中的每一个元素*7
* map的操作是一个one to one的操作
*/
private def mapMethod(sc: SparkContext) = {
val list = 1 to 10
val listRDD = sc.parallelize(list)
val bs = 7
println("listRDD's partition: " + listRDD.getNumPartitions)
val retRDD: RDD[Int] = listRDD.map(num => num * bs)
retRDD.foreach(num => println(num))
}
2)flatmap操作
/**
* flatMap(func):类似于map,但是每一个输入元素,
* 会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
*
* flatMap是one to many
* @param sc
*/
private def flatMapMethod(sc: SparkContext) = {
val list = List(
"hello you",
"hello me he",
"you you"
)
val listRDD:RDD[String] = sc.parallelize(list)
val wordsRDD:RDD[String] = listRDD.flatMap{case line => {
line.split(" ")
}}
wordsRDD.foreach(println)
}
3)filter操作
/**
* filter(func): 返回一个新的数据集,由经过func函数后返回值为true的原元素组
成
* 过滤出集合中的偶数even,留下奇数odd
*/
private def filterMethod(sc: SparkContext) = {
val range = 1 until 10
val listRDD = sc.parallelize(range)
val filteredRDD = listRDD.filter{case num => {num % 2 != 0}}
filteredRDD.foreach(println)
}
4)sample操作
/**
* 采集抽样:
* 就是用样本空间来代替总体,查看总体数据的分布。
* 这个Spark的sample抽样是否非准确抽样,如果要抽取固定的数据量,只能自
己手动处理。
* 会在后面学习Spark如何解决数据倾斜的时候使用
* sample(withReplacement, frac, seed):
* 根据给定的随机种子seed,随机抽样出数量为frac的数据
*/
private def sampleMethod(sc: SparkContext) = {
val range = 1 to 100000
val listRDD = sc.parallelize(range)
val sampleRDD = listRDD.sample(true, 0.002)
sampleRDD.foreach(println)
println(s"sampleRDD的元素个数:${sampleRDD.count()}")
}
5)union操作
/**
* union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成
* 该操作类似于sql中的union all的操作(而sql中的union会做去重操作)
*/
private def unionMethod(sc: SparkContext) = {
val list1 = List(1, 2, 3, 4, 5)
val list2 = List(3, 5, 6, 8, 10)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val unionRDD = listRDD1.union(listRDD2)
unionRDD.foreach(println)
println("-----------------------------------")
val interRDD = listRDD1.intersection(listRDD2)//求rdd的交集
interRDD.foreach(println)
}
4,SparkRDD的action操作
package com.aura.bigdata.spark.core.p2
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.io.compress.{DefaultCodec, SnappyCodec}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 学习SparkRDD的action操作
* action操作是spark作业job执行的动因
*/
object _04SparkActionOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val conf = new SparkConf().setAppName("_03SparkTransformationOps")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val list = List(
"hello you",
"hello me he",
"hello shit he",
"you you"
)
val listRDD: RDD[String] = sc.parallelize(list)
val wordsRDD: RDD[String] = listRDD.flatMap { case line => {
line.split("\\s+")
}
}
val pairsRDD = wordsRDD.map((_, 1))
/*
reduce(func): 通过函数func聚集数据集中的所有元素。Func函数接受2个
参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
该reduce操作和scala中的reduce操作一模一样
val list = 1 to 100
val listRDD = sc.parallelize(list)
val ret = listRDD.reduce((v1, v2) => v1 + v2)
*/
/*
collect():
在Driver的程序中,以数组的形式,返回数据集的所有元素。
这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,
直接将整个RDD集Collect返回,很可能会让Driver程序OOM
将RDD中各个分区中的数据,全都拉取到driver中。
*/
pairsRDD.collect().foreach(println)
/**
* count(): 返回数据集rdd的元素个数
*/
println("===========pairsRDD总共有" + pairsRDD.count() + "条记录。")
/*
take(n): 返回一个数组,由数据集的前n个元素组成。
注意,这个操作目前并非在多个节点之上并行执行,而是Driver程序所在机器,
单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)
*/
println("=====================take(3)=====")
pairsRDD.take(3).foreach(println)
println("=====================takeOrdered(2)=====")
val retRDD = pairsRDD.reduceByKey(_+_)
retRDD.takeOrdered(2)(new Ordering[(String, Int)](){
override def compare(x: (String, Int), y: (String, Int)) = {
var ret = y._2.compareTo(x._2)
if(ret == 0) {
ret = x._1.compareTo(y._1)
}
ret
}
}).foreach(println)
// first(): 返回数据集的第一个元素(类似于take(1))
/*
saveAsTextFile(path):将数据集的元素,以textfile形式,保存到本地文件系统,
hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString
方法,
并将它转换为文件中的一行文本
*/
retRDD.saveAsTextFile("file:///E:/data/spark/wc" +
System.currentTimeMillis())
/**
* saveAsSequenceFile(path):
* 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系
统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-
value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为
* Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
*/
// retRDD.saveAsSequenceFile("file:///E:/data/spark/seq" +
System.currentTimeMillis())
retRDD.saveAsSequenceFile(
"file:///E:/data/spark/seq" + System.currentTimeMillis(),
Option(classOf[DefaultCodec])
)
// foreach(func): 在数据集的每一个元素上,运行函数func。这通常用于更新一
个累加器变量,或者和外部存储系统做交互
retRDD.saveAsNewAPIHadoopFile(
"file:///E:/data/spark/nahf" + System.currentTimeMillis(),
classOf[Text],
classOf[IntWritable],
classOf[TextOutputFormat[Text, IntWritable]]
)
sc.stop()
}
}