Spark列中的数组的每个值的映射

问题描述:

我有以下格式的json数据集,每行一个条目。Spark列中的数组的每个值的映射

{ "sales_person_name" : "John", "products" : ["apple", "mango", "guava"]} 
{ "sales_person_name" : "Tom", "products" : ["mango", "orange"]} 
{ "sales_person_name" : "John", "products" : ["apple", "banana"]} 
{ "sales_person_name" : "Steve", "products" : ["apple", "mango"]} 
{ "sales_person_name" : "Tom", "products" : ["mango", "guava"]} 

我想知道谁卖了最大的芒果等等。 因此,我想将文件加载到数据框,并为每个事务发出阵列中每个产品值的(键,值)(产品,名称)对。

var df = spark.read.json("s3n://sales-data.json") 
df.printSchema() 
root 
|-- sales_person_name: string (nullable = true) 
|-- products: array (nullable = true) 

var nameProductsMap = df.select("sales_person_name", "products").show() 
+-----------------+--------------------+ 
|sales_person_name| products   | 
+-----------------+--------------------+ 
|    John|[mango, apple,... | 
|    Tom|[mango, orange,... | 
|    John|[apple, banana... | 

var resultMap = df.select("products", "sales_person_name") 
        .map(r => (r(1), r(0))) 
        .show() //This is where I am stuck. 

我无法找出正确的方式爆炸()行(0),并有它的所有行(1)值一次发射值。任何人都可以提出一种方法谢谢!

+0

给定示例的预期输出是什么? – Nyavro

+0

芒果:约翰(4),汤姆(2),格雷格(1)...香蕉:汤姆(5),约翰(2)... – lazywiz

+0

我想这样的:var actorHashtagsMap = df.select(“products “,”sales_person_name“)。map(r => {0} .map(x =>(x,r(1))) – lazywiz

UPDATE

下面的代码应该工作

import org.apache.spark.sql.functions.explode 
import scala.collection.mutable 

val resultMap = df.select(explode($"products"), $"sales_person_name") 


def counter(l: TraversableOnce[Any]) = { 
    val temp = mutable.Map[Any, Int]() 
    for (i <- l) { 
     if(temp.contains(i)) temp(i) += 1 
     else temp(i) = 1 
    } 
    temp 
} 

resultsMap.map(x => (x(0), Array(x(1)))). 
      reduceByKey(_ ++ _). 
      map { case (x,y) => (x, counter(y).toArray) } 

所得输出:Array((banana,Array((John,1))), (guava,Array((Tom,1), (John,1))), (orange,Array((Tom,1))), (apple,Array((Steve,1), (John,2))), (mango,Array((Tom,2), (Steve,1), (John,1))))

+0

这是一个中间步骤。我终于想要通过col产品来减少它:Apple:John(4),Tom(2),Steve(1);芒果:史蒂夫(3),汤姆(1); // – lazywiz

+0

更新了减少操作的答案。 – septra

+0

感谢代码@septa然而,我收到以下错误。错误:值reduceByKey不是org.apache.spark.sql.Dataset [(Any,Array [Any])] 的成员可能的原因:可能是在value减少之前缺少分号?我正在处理任何种类的map(),我正在对结果集进行处理。我正在使用Spark 2.0。任何线索? – lazywiz

val exploded = df.explode("products", "product") { a: mutable.WrappedArray[String] => a } 
val result = exploded.drop("products") 
result.show() 

打印:

+-----------------+-------+ 
|sales_person_name|product| 
+-----------------+-------+ 
|    John| apple| 
|    John| mango| 
|    John| guava| 
|    Tom| mango| 
|    Tom| orange| 
|    John| apple| 
|    John| banana| 
|   Steve| apple| 
|   Steve| mango| 
|    Tom| mango| 
|    Tom| guava| 
+-----------------+-------+ 
+0

谢谢Zohar!你看起来很容易。我不得不把import语句放在mutable._中,并且爆炸在select()结果而不是df上。我明白了你的想法,现在很简单。谢谢! – lazywiz