Spark常用算子

Spark的算子分为两种

1.transformation(转换):这种算子是延迟加载,因为消息缓存比较大
一旦使用了transformation算子,sc会记录使用了那些算子,算子里面使用了什么函数
2.action:触发计算
一旦触发action算子,就会立即执行计算

RDD有两种创建方式:

1.是从外界的文件系统中读取数据来创建RDD
2.通过scala集合创建RDD,要将集合并行化(parallelize)

RDD有两种算子:

1.transformation没有立即读取数据,只是记录了读取数据的路径
2.action触发读数据、计算数据

常用Transformation(即转换,延迟加载)

#通过并行化scala集合创建RDD
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6))
#查看该rdd的分区数量
rdd1.partitions.length

Spark常用算子
//升序排序
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
Spark常用算子
//过滤
val rdd3 = rdd2.filter(_>10)
Spark常用算子
//字典序排序
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
Spark常用算子
Spark常用算子
val rdd4 = sc.parallelize(Array(“a b c”,“d e f”,“h i j”))
//切割压平
rdd4.flatMap(_.split(" “)).collect
Spark常用算子
val rdd5 = sc.parallelize(List(List(“a b c”,“a b b”),List(“e f g”,“a f g”),List(“h i j”,“a a b”)))
rdd5.flatMap(_.flatMap(_.split(” “))).collect
Spark常用算子
#union求并集,注意类型要一致
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
Spark常用算子
去除相同的元素
rdd8.distinct.sortBy(x=>x).collect
Spark常用算子
#intersection求交集
val rdd9 = rdd6.intersection(rdd7)
Spark常用算子
val rdd1 = sc.parallelize(List((“tom”,1),(“jerry”,2),(“kitty”,3)))
val rdd2 = sc.parallelize(List((“jerry”,9),(“tom”,8),(“shuke”,7)))
#join
val rdd3 = rdd1.join(rdd2)
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd3 = rdd1.rigthOuterJoin(rdd2)
Spark常用算子
Spark常用算子
Spark常用算子
#groupByKey
val rdd3 = rdd1.union(rdd2)
rdd3.groupByKey
rdd3.groupByKey.map(x=>(x._1,x._2.sum))
Spark常用算子
Spark常用算子
#WordCount(集群模式每个节点都要有该文件)
//效率高(有combiner操作)
sc.textFile(”/root/words.txt").flatMap(x=>x.split(" “)).map((_,1)).reduceByKey(_+_).sortBy(_._2,false). collect
//效率低(shuffle需要传输的较多)
sc.textFile(”/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).groupByKey.map(t=>(t._1,t._2.sum)).collect

Spark常用算子
Spark常用算子
Spark常用算子

#cogroup
val rdd1 = sc.parallelize(List((“tom”,1),(“tom”,2),(“jerry”,3),(“kitty”,2)))
val rdd2 = sc.parallelize(List((“jerry”,2),(“tom”,1),(“shuke”,2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1,t._2._1.sum+t._2._2.sum))
Spark常用算子
Spark常用算子
#cartesian笛卡尔积
val rdd1 = sc.parallelize(List(“tom”,“jerry”))
val rdd2 = sc.parallelize(List(“tom”,“kitty”,“shuke”))
val rdd3 = rdd1.cartesian(rdd2)
Spark常用算子

Spark action
val rdd1 = sc.parallelize(List(1,2,3,4,5),2)

#collect
rdd1.collect
Spark常用算子
#reduce
val rdd2 = rdd1.reduce(_+_)
Spark常用算子
#count
rdd1.count
Spark常用算子
#top
rdd1.top(2)
Spark常用算子
#take
rdd1.take(2)
Spark常用算子
#first(与take(1)相似)
rdd1.first
Spark常用算子
#takeOrdered
rdd1.takeOrdered(3)
Spark常用算子

高阶算子

mapPartitionsWithIndex:把每个partition中的分区号和对应的值拿出来
val func = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => “[partID:” + index + “, val: " + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1.mapPartitionsWithIndex(func).collect
Spark常用算子
aggregate
def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: " + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1.mapPartitionsWithIndex(func1).collect
###是action操作, 第一个参数是初始值, 二:是2个函数[每个函数都是2个参数(第一个参数:先对个个分区进行合并, 第二个:对个个分区合并后的结果再进行合并), 输出一个参数]
###0 + (0+1+2+3+4 + 0+5+6+7+8+9)
rdd1.aggregate(0)(_+_, _+_)
rdd1.aggregate(0)(math.max(_, _), _ + _)
###6和1比, 得6再和234比得6 -->6和6789比,得9 --> 6 + (6+9)
rdd1.aggregate(5)(math.max(_, _), _ + _)
Spark常用算子
Spark常用算子
val rdd2 = sc.parallelize(List(“a”,“b”,“c”,“d”,“e”,“f”),2)
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: " + x + “]”).iterator
}
“”+”“abc+”“def->abcdef
rdd2.rdd2.aggregate(””)(_ + _, _ + _)
=+=abc+=def->==abc=def
rdd2.aggregate(”=")(_ + _, _+ _)
Spark常用算子
val rdd3 = sc.parallelize(List(“12”,“23”,“345”,“4567”),2)
“”(0).length和"12"(2).length比较,toString->“2”;“2”(1).length再与"23"(2).length比较->2
“”.(0)length和"345"(3).length比较,toString->“3”;“3”(1).legth再与"4567"(4).length比较->4
因为是并行执行的,结果有可能为24也有可能为42
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
val rdd4 = sc.parallelize(List(“12”,“23”,“345”,""),2) 结果(10,01)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
val rdd5 = sc.parallelize(List(“12”,“23”,"",“345”),2) 结果(11)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
Spark常用算子
Spark常用算子
Spark常用算子
aggregateByKey
val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: " + x + “]”).iterator
}
pairRDD.mapPartitionsWithIndex(func2).collect
partID=0(cat)0与2比较->2与5比较->5
(mouse)0与4比较->4
partID=1(cat)0与12比较->12
(dog)0与12比较->12
(mouse)0与2比较->2
(cat,5+12) (mouse,4+2) (dog,12)
pairRDD.aggregateByKey(0)(math.max(_, _), _+ _).collect
pairRDD.aggregateByKey(100)(math.max(_, _), _+ _).collect
Spark常用算子
combineByKey : 和reduceByKey是相同的效果
###第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算
###每个分区中每个key中value中的第一个值, (hello,1)(hello,1)(good,1)–>(hello(1,1),good(1))–>x就相当于hello的第一个1, good中的1
val rdd1 = sc.textFile(“hdfs://cdh:9000/SparkWordCount/words.txt”).flatMap(_.split(” “)).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd1.collect
rdd2.collect
Spark常用算子
countByKey
val rdd1 = sc.parallelize(List((“a”, 1), (“b”, 2), (“b”, 2), (“c”, 2), (“c”, 1)))
rdd1.countByKey
rdd1.countByValue
Spark常用算子
filterByRange(范围过滤)
val rdd1 = sc.parallelize(List((“e”, 5), (“c”, 3), (“d”, 4), (“c”, 2), (“a”, 1),(“b”, 6))))
val rdd2 = rdd1.filterByRange(“b”, “d”)
rdd2.collect
Spark常用算子
foldByKey
val rdd1 = sc.parallelize(List(“dog”, “wolf”, “cat”, “bear”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
val rdd3 = rdd2.foldByKey(”")(_+_)
Spark常用算子
keyBy : 以传入的参数做key
val rdd1 = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)
val rdd2 = rdd1.keyBy(_.length)
rdd2.collect
Spark常用算子
keys values
val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect
Spark常用算子