Spark列中的数组的每个值的映射
问题描述:
我有以下格式的json数据集,每行一个条目。Spark列中的数组的每个值的映射
{ "sales_person_name" : "John", "products" : ["apple", "mango", "guava"]}
{ "sales_person_name" : "Tom", "products" : ["mango", "orange"]}
{ "sales_person_name" : "John", "products" : ["apple", "banana"]}
{ "sales_person_name" : "Steve", "products" : ["apple", "mango"]}
{ "sales_person_name" : "Tom", "products" : ["mango", "guava"]}
我想知道谁卖了最大的芒果等等。 因此,我想将文件加载到数据框,并为每个事务发出阵列中每个产品值的(键,值)(产品,名称)对。
var df = spark.read.json("s3n://sales-data.json")
df.printSchema()
root
|-- sales_person_name: string (nullable = true)
|-- products: array (nullable = true)
var nameProductsMap = df.select("sales_person_name", "products").show()
+-----------------+--------------------+
|sales_person_name| products |
+-----------------+--------------------+
| John|[mango, apple,... |
| Tom|[mango, orange,... |
| John|[apple, banana... |
var resultMap = df.select("products", "sales_person_name")
.map(r => (r(1), r(0)))
.show() //This is where I am stuck.
我无法找出正确的方式爆炸()行(0),并有它的所有行(1)值一次发射值。任何人都可以提出一种方法谢谢!
答
UPDATE
下面的代码应该工作
import org.apache.spark.sql.functions.explode
import scala.collection.mutable
val resultMap = df.select(explode($"products"), $"sales_person_name")
def counter(l: TraversableOnce[Any]) = {
val temp = mutable.Map[Any, Int]()
for (i <- l) {
if(temp.contains(i)) temp(i) += 1
else temp(i) = 1
}
temp
}
resultsMap.map(x => (x(0), Array(x(1)))).
reduceByKey(_ ++ _).
map { case (x,y) => (x, counter(y).toArray) }
所得输出:Array((banana,Array((John,1))), (guava,Array((Tom,1), (John,1))), (orange,Array((Tom,1))), (apple,Array((Steve,1), (John,2))), (mango,Array((Tom,2), (Steve,1), (John,1))))
答
val exploded = df.explode("products", "product") { a: mutable.WrappedArray[String] => a }
val result = exploded.drop("products")
result.show()
打印:
+-----------------+-------+
|sales_person_name|product|
+-----------------+-------+
| John| apple|
| John| mango|
| John| guava|
| Tom| mango|
| Tom| orange|
| John| apple|
| John| banana|
| Steve| apple|
| Steve| mango|
| Tom| mango|
| Tom| guava|
+-----------------+-------+
+0
谢谢Zohar!你看起来很容易。我不得不把import语句放在mutable._中,并且爆炸在select()结果而不是df上。我明白了你的想法,现在很简单。谢谢! – lazywiz
给定示例的预期输出是什么? – Nyavro
芒果:约翰(4),汤姆(2),格雷格(1)...香蕉:汤姆(5),约翰(2)... – lazywiz
我想这样的:var actorHashtagsMap = df.select(“products “,”sales_person_name“)。map(r => {0} .map(x =>(x,r(1))) – lazywiz