Spark—键值对操作
Spark—键值对操作
简述
键值对RDD是Spark中许多操作所需要的常见数据类型。一般通过一些初始ETL(抽取、转化、装载)操作将数据转化为键值对形式。
动机
Spark中包含键值对类型的RDD被称为pair RDD。Pair RDD是很多程序的构成要素,因为它们提供了并行操作各个键或跨节点重新进行数据分组的操作接口。
创建Pair RDD
Python中,为了让提取键之后的数据能够在函数中使用,需要返回一个由二元组组成的RDD。
pairs = lines.map(lambda x: (x.split(" "), x))
Pair RDD的转换操作
Pair RDD可以使用所以标准RDD上的可用的转化操作。同样,传递函数规则也适用于Pair RDD。
- Pair RDD的转化操作,以键值对集合{(1,2), (3,4), (3,6)}为例
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
reduceByKey(func) | 合并具有相同键的值 | rdd.reduceByKey(lambda x, y: x+y) | {(1,2), (3,10)} |
groupByKey() | 对具有相同键的值进行分组 | rdd.groupByKey() | {(1,[2]), (3, [4, 6])} |
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) | 使用不同的返回类型合并具有相同键的值 | ||
mapValues(func) | 对pair RDD的每个值应用一个函数而不改变键 | rdd.mapValues(lambda x: x+1) | {(1, 3), (3, 5), (3, 7)} |
flatMapValues(func) | 对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素生成一个对应原键的键值对记录。通常用于符号化。 | >>>def s(x): return x >>>rdd.flatMapValues(s).collect() #此时的rdd,二元组中的value应该为集合(list等) |
|
keys | 返回一个仅包含键的RDD | rdd.keys() | [1,3,3] |
values | 返回一个仅包含值的RDD | rdd.values() | [2,4,6] |
sortByKey() | 返回一个根据键排序的RDD | rdd.sortByKey() | [(1,2), (3, 4), (3, 6)] |
- 针对两个pair RDD的转化操作(rdd=[(1,2), (3,4), (3,6)] other=[(3,9)])
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
subtractByKey() | 删除RDD中键与otherRDD中的键相同的元素 | rdd.subtractByKey(other) | [(1,2)] |
join | 对两个RDD进行内连接 | rdd.join(other) | [(3, (4,9)), (3, (6,9))] |
rightOuterJoin | 对两个RDD进行连接操作,确保第一个RDD的键必修存在(右外连接) | rdd.rightOuterJoin(other) | [(3,(Some(4), 9)), (3, (Some(6), 9))] |
leftOuterJoin | 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接) | rdd.leftOuterJoin(other) | [(1, (2, None)), (3, (4, Some(9))), (3, 6, Some(9))] |
cogroup | 将两个RDD中拥有相同键的数据分组 | rdd.cogroup(other) | [(1,([2], [])), (3, ([4,6], [9]))] |
聚合操作(转化操作)
聚合操作:聚合具有相同键的因素进行一些统计。
-
reduceByKey
为数据集中的每个键进行并行的规约操作,每个归约操作会将键相同的值合并。 -
foleByKey
类似fold,使用合并函数对零值与另一个元素进行合并,并将结果返回。
# 在Python中使用reduceByKey()和mapValues()计算每个键对应的平均值
rdd.mapValues(lambda x: (x, 1)).reduceBykey(lambda x, y: (x[0]+y[0], x[1]+y[1])).mapValues(lambda x: x[0]/x[1])
# 用python实现单词计数:method1
filerdd = sc.textFile('file:///G:/spark/README.md')
words = filerdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)
# 用python实现单词计数:method2
filerdd = sc.textFile('file:///G:/spark/README.md')
words = filerdd.map(lambda x: x.split(" ")).countByValue()
返回结果为defaultdict.
3. combineByKey(createCombiner, mergeValue, mergeCombiners,partitioner)
combineByKey处理数据过程:
combinerByKey处理时,会遍历分区中的所有元素。
- 当遇见的元素是新的元素,会使用 createCombiner来创建对应的累加器的初始值。
- 当遇见的元素是已经遇见过的,mergeValue会将该键是累加器对应的当前值与遇见的这个键的值进行合并。
- 由于每个分区都是独立处理的,因此对于同一个键可以由多个累加器。如果两个或者更多的分区都有对应同一个键的累加器,mergeCombiners就会将各个分区的结果进行合并。
# Python中使用combineByKey求每个键对应的平均值
def createCombiner(value):
return (value, 1)
def mergeValue(acc, value):
return (acc[0]+value, acc[1]+1)
def mergeCombiners(acc1, acc2):
return (acc1[0]+acc2[0], acc1[1]+ acc2[1])
sumCount = nums.combineByKey(createCombiner, mergeValue, mergeCombiners)
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()
4. 并行度调优
每个RDD都有固定数目的分区,分区数决定了在RDD上执行操作的并行度。
Spark会尝试根据集群的大小推断出一个有意义的默认值,也可以人工设置并行度。
例如:
# 在Python中自定义reduceByKey的并行度
data = [('a', 3), ('b', 4), ('a', 1)]
sc.parallelize(data).reduceByKey(lambda x, y: x+y) #默认并行度
sc.parallelize(data).reduceByKey(lambda x, y: x+y , 10) # 自定义并行度
除分组和聚合操作外,在其他操作中,修改兵并行度,可以使用,repartition()和coalesce()。
数据分组
对于有键的数据,可以根据键将数据进行分组。例如:查看顾客的所有订单。
groupByKey()和cogroup():使用RDD中的键对数据进行分组。
连接
连接类型:右外连接、左外连接、交叉连接、内连接。
- join:内连接
只有在两个pair RDD都存在的键才输出。
- 左外连接
要求源RDD中键必须存在,连接的RDD中对应键的value如果不存在,则用None代替。
- 右外连接
要求连接的RDD中的键应该都存在,源RDD中对应的键不存在,则用None代替。
数据排序
Pair RDD的行动操作
Pair RDD的行动操作集合,以键值对[(1,2), (3, 4),(3,6)]为示例
函数名 | 描述 | 示例 | 结果 |
---|---|---|---|
countByKey() | 对每个键对应的元素分别计数 | rdd.countByKey() | [(1,1), (3, 2)] |
collectAsMap() | 将结果以映射表的形式返回,以便查询 | rdd.collectAsMap() | Map[(1,2), (3,4), (3,6)] |
lookup(key) | 返回给定键对应的所有值 | rdd.lookup(3) | [4,6] |