spark算子--transformation篇
spark算子分为两大种,一种是transformation算子,另一种是action算子。其实细分的话transformation算子又可以细分为value型和k-v型,个人感觉没必要,也就不细化了,省得把大家搞晕。
transformation又叫转换算子,它从一个RDD到另一个RDD是延迟执行的,不会马上触发作业的提交,只有在后续遇到某个action算子时才执行;
action算子会触发SparkContext提交Job,并将数据输出spark系统。今天先来举例讲解一下transformation算子。
1) map
最常用的一个算子,对RDD中每一个元素进行运算。
举个栗子:
val rdd = sc.parallelize(List("hello","world!","hi","beijing"),2)
val sizeRdd = rdd.map(_.length)
rdd.zip(sizeRdd).collect().foreach(println)
输出:
(hello,5)
(world!,6)
(hi,2)
(beijing,7)
2) filter
过滤元素用的算子,取符合条件的元素。
举个栗子:
val rdd = sc.parallelize(List("hello","world!","hi","beijing"),2)
rdd.filter(_.length % 2 == 0).collect().foreach(println)
输出:
world!
hi
3) distinct
去重的算子,底层是reduceByKey实现的。
举个栗子:
val rdd = sc.parallelize(List("hello","world!","hello","beijing"),2)
rdd.distinct().collect().foreach(println)
输出:
hello
beijing
world!
4) reduceByKey
是个k-v算子,按key进行运算,与groupByKey有本质的区别,因为此算子会在本地先进行reduce再与集群其他机器进行reduce,而groupByKey是直接在整个集群中进行shuffle,groupByKey的网络开销要比reduceByKey大很多。
所以建议优先使用reduceByKey。
reduceByKey适用于比较大小、求最值、按key求和等场景。
举个栗子:
val rdd = sc.makeRDD(List(("hello",2),("hello",1),("world",5),("world",9)))
rdd.reduceByKey((x,y)=>if(x<y) x else y).collect().foreach(println)
输出:
(hello,1
(world,5)
5) join
是个k-v算子,对两个RDD按key内关联,只取两个RDD共有的部分。
举个栗子:
val rdd1 = sc.makeRDD(List(("hello",1),("world",2),("hi",3)))
val rdd2 = sc.makeRDD(List(("hello",2),("java",2),("hi",5)))
rdd1.join(rdd2).foreach(println)
输出:
(hi,(3,5))
(hello,(1,2))
6) union
集合运算,对两个RDD集合求并集,对结果集不去重
举个栗子:
val rdd1 = sc.makeRDD(List("hello","world","hello","beijing"))
val rdd2 = sc.makeRDD(List("hi","world","hi","beijing"))
rdd1.union(rdd2).foreach(println)
输出:
hello
world
hello
beijing
hi
world
hi
beijing
7) intersection
集合运算,对两个RDD集合求交集,并对结果集去重
举个栗子:
val rdd1 = sc.makeRDD(List("hello","world","hello","beijing"))
val rdd2 = sc.makeRDD(List("hi","world","hi","beijing"))
rdd1.intersection(rdd2).collect().foreach(println)
输出:
beijing
world
8) repartition / coalesce
repartition:重分区,传入参数为要分成几个区。会进行shuffle操作,把元素尽量均匀的分布在这几个分区中。是coalesce参数为true的特例。
举个栗子:
val rdd1 = sc.parallelize(List("hello","world!","hi","beijing"),2)
println(rdd1.getNumPartitions)
val rdd2 = rdd1.repartition(4)
println(rdd2.getNumPartitions)
输出:
2
4
coalesce:重新分区,传入参数为要分成的几个分区。
与repartition不同的是它不会shuffle,有可能会导致各分区中数据极不平衡的问题。
9) cache / persist
两者都是对RDD进行缓存。cache是persist的一个特例,cache把数据缓存到了内存中,无序列化。
至于persist嘛,就是可以选择缓存到内存、硬盘或自己指定是否序列化,相比cache更灵活
10) mapValues
举个栗子:
val rdd = sc.makeRDD(List(("hello",1),("world",2),("hi",3)))
rdd.mapValues(_ * 2).collect().foreach(println)
输出:
(hello,2)
(world,4)
(hi,6)
11) leftOutJoin / rightOutJoin
与join算子是一家人,与join内关联不同,leftOutJoin/rightOutJoin是外关联,两个RDD进行关联时以左/右为主,关联不上的算None
举个栗子:
val rdd1 = sc.makeRDD(List(("hello",1),("world",2),("hi",3)))
val rdd2 = sc.makeRDD(List(("hello",2),("java",2),("hi",5)))
rdd1.leftOuterJoin(rdd2).foreach(println)
输出:
(hi,(3,Some(5)))
(hello,(1,Some(2)))
(world,(2,None))
rightOutJoin 例子同 leftOutJoin,不再赘述
12) mapPartitions
与map不同,map是对RDD中每条数据进行运算,而mapPartitions是获取了分区的迭代器,对n个分区执行n遍运算就行。
举个栗子:
rdd.mapPartitions(partiton=>{
partiton.map(line=>{
…
}
)
})
适用场景:比较适合需要分批处理数据的情况,比如在数据库操作中,将数据插入某个表,每批数据只需要开启一次数据库连接就行,这样大大减少了连接开支
但mapPartitions也是有缺点的,比如在数据量大的时候,比如处理100万条数据,map在发现内存不够时会把遍历过的数据从内存中垃圾回收处理,而mapPartitions则不会。
13) flatMap
压扁操作。
与map区别是:map会把n个元素转换成n个元素,而flatMap会把n个元素转换成n个集合,然后再把这n个集合合并到一个集合中
举个栗子:
val rdd1 = sc.makeRDD(List("hello,world","hi,beijing"))
rdd1.flatMap(_.split(",")).collect().foreach(println)
输出:
hello
world
hi
beijing
14) mapPartitionsWithIndex
与mapPartitions差不多,区别就是返回值中带上了分区的编号
举个栗子:
val rdd = sc.parallelize(List("hello","world!","hi","beijing"),3)
rdd.mapPartitionsWithIndex((index,iter)=>{
iter.toList.map(x=>"[ID:"+index+",value:"+x+"]").iterator
}).collect().foreach(println)
输出:
[ID:0,value:hello]
[ID:1,value:world!]
[ID:2,value:hi]
[ID:2,value:beijing]
15) groupByKey
groupByKey方法主要作用是将键相同的所有的键值对分组到一个集合序列当中,
如(a,1),(a,3),(b,1),(c,1),(c,3), 分组后结果是 ( (a,1), (a,3)) , (b,1), ( (c,1), (c,3) ),
分组后的集合中的元素顺序是不确定的,比如键a的值集合也可能是( (a,3), (a,1) ).
相对而言,groupByKey方法是比较昂贵的操作,意思就是说比较消耗资源。
与reduceByKey区别: reduceByKey作用是聚合,异或等,groupByKey作用主要是分组,也可以在分组之后做聚合
16) aggregateByKey
aggregateByKey是先对每个partition中的数据根据不同的Key进行aggregate,然后将结果进行shuffle,完成各个partition之间的aggregate。因此,和groupByKey相比,运算量小了很多。
此算子小编用的也不多,所以也就不多做介绍了。
17) combineByKey
combineByKey是通用API,供reduceByKey和aggregateByKey使用,不常用,不多做介绍
18) sortByKey
是个k-v算子。按key进行排序。
举个栗子:
val rdd = sc.makeRDD(List(("111",1),("333",3),("222",2)))
rdd.sortByKey().collect().foreach(println)
输出:
(111,1)
(222,2)
(333,3)
19) sample
抽样。
举个栗子:
var rdd1 = sc.makeRDD(Array("aaa","bbb","ccc","ddd","eee"))
val rdd2: RDD[String] = rdd1.sample(false,0.3)
rdd2.collect().foreach(println)
输出:
ccc
eee
上述例子中的false表示抽中了aaa元素之后就不再抽取它,true表示可以某元素可以循环被抽取
注意:takeSample算子是个action算子,网上好多文章把takeSample归到了transformation算子中,这是不对的,因为takeSample会把抽样的数据加载到driver中。
20) cogroup
两个集合的关联操作,官方解释是:将输入数据集(K, V)和另外一个数据集(K, W)进行cogroup,得到一个格式为(K, Seq[V], Seq[W])的数据集。
举个栗子:
val rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
val rdd2 = sc.parallelize(List(("x",77),("b",88),("z",99)))
rdd1.cogroup(rdd2).collect().foreach(println)
输出:
(z, (CompactBuffer(),CompactBuffer(99)))
(x, (CompactBuffer(),CompactBuffer(77)))
(a, (CompactBuffer(1),CompactBuffer()))
(b, (CompactBuffer(2),CompactBuffer(88)))
(c, (CompactBuffer(3),CompactBuffer()))
cogroup 有点类似于 groupByKey 算子,都是把相同 key 的元素进行分组,但是与cogroup不同的是:
1) groupByKey 是对于一个RDD中的所有元素按照 key 进行分组,而cogroup 是对于两个RDD的元素按照key 进行分组;
2) groupByKey返回的是RDD[(key,Iterable[value])],而cogroup返回的是RDD[(key, (Iterable[value1]), Iterable[value2])]的形式;
3) 两者的操作对象都必须是键值对形式的RDD;
21) cartesian
笛卡尔积,两表的生硬连接,左表的每一个记录去关联右表的每一个元素。不常用。
举个栗子:
val rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
val rdd2 = sc.parallelize(List(("x",77),("b",88),("z",99)))
rdd1.cartesian(rdd2).collect().foreach(println)
输出:
((a,1),(x,77))
((a,1),(b,88))
((a,1),(z,99))
((b,2),(x,77))
((b,2),(b,88))
((b,2),(z,99))
((c,3),(x,77))
((c,3),(b,88))
((c,3),(z,99))
22) pipe
调用Shell命令,通过pipe变换算子将一些shell命令用于Spark中生成新的RDD。
举个栗子:
rdd.pipe("head -n 1") // 取每个分区中的第一个元素构成新的RDD
23) repartitionAndSortWithinPartitions
RDD进行重新分区,并在每个新分区内按key进行排序。相当于repartition和sortByKey合并到一个算子中进行了,更高效。
喜欢的话请微信扫描下方二维码关注我!