检查如果一个RDD(K,V)V是包含在另一个R dd(K,V)V
问题描述:
我有两个RDD(K,V),在火花它不允许两个映射嵌套。检查如果一个RDD(K,V)V是包含在另一个R dd(K,V)V
val x = sc.parallelize(List((1,"abc"),(2,"efg")))
val y = sc.parallelize(List((1,"ab"),(2,"ef"), (3,"tag"))
如果RDD很大,我想检查“abc”是否包含“ab”。
答
假设你想从RDD X时,它的子选择一个值出现在RDDŸ那么这段代码应该工作。
def main(args: Array[String]): Unit = {
val x = spark.sparkContext.parallelize(List((1, "abc"), (2, "efg")))
val y = spark.sparkContext.parallelize(List((1, "ab"), (2, "ef"), (3, "tag")))
// This RDD is filtered. That is we are selecting elements from x only if the substring of the value is present in
// the RDD y.
val filteredRDD = filterRDD(x, y)
// Now we map the filteredRDD to our result list
val resultArray = filteredRDD.map(x => x._2).collect()
}
def filterRDD(x: RDD[(Int, String)], y: RDD[(Int, String)]): RDD[(Int, String)] = {
// Broadcasting the y RDD to all spark nodes, since we are collecting this before hand.
// The reason we are collecting the y RDD is to avoid call collect in the filter logic
val y_bc = spark.sparkContext.broadcast(y.collect.toSet)
x.filter(m => {
y_bc.value.exists(n => m._2.contains(n._2))
})
}
+1
谢谢,数据量小时使用广播。 – ozil
您是否可以使用所需的输出更新您正在查找的问题。 –
谢谢,我想知道“abc”是否包含“ab”,输出如(abc,efg) – ozil