使用Spark Scala计算平均值

使用Spark Scala计算平均值

问题描述:

如何计算Spark Scala中每个位置的平均工资低于两个数据集?使用Spark Scala计算平均值

File1.csv(第4栏是薪金)

Ram, 30, Engineer, 40000 
Bala, 27, Doctor, 30000 
Hari, 33, Engineer, 50000 
Siva, 35, Doctor, 60000 

File2.csv(第2栏是位置)

Hari, Bangalore 
Ram, Chennai 
Bala, Bangalore 
Siva, Chennai 

上述文件不被排序。需要加入这两个文件,并找到每个位置的平均工资。我试着用下面的代码,但无法做到。

val salary = sc.textFile("File1.csv").map(e => e.split(",")) 
val location = sc.textFile("File2.csv").map(e.split(",")) 
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1))) 
val joinedData = joined.sortByKey() 
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2)) 
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))  
aggregatedDF.repartition(1).saveAsTextFile("output.txt") 

请帮忙看看它的代码和示例输出。

非常感谢

我会用数据帧API,这应该工作:

val salary = sc.textFile("File1.csv") 
       .map(e => e.split(",")) 
       .map{case Seq(name,_,_,salary) => (name,salary)} 
       .toDF("name","salary") 

val location = sc.textFile("File2.csv") 
       .map(e => e.split(",")) 
       .map{case Seq(name,location) => (name,location)} 
       .toDF("name","location") 

import org.apache.spark.sql.functions._ 

salary 
    .join(location,Seq("name")) 
    .groupBy($"location") 
    .agg(
    avg($"salary").as("avg_salary") 
) 
    .repartition(1) 
    .write.csv("output.csv") 
+0

那么这里的最终输出如下所示? + ------------------------ + |位置| avg_salary | + ------------------------ + |班加罗尔| 40000 | |钦奈| 500000 | + ------------------------ + – akrockz

+0

还有一个疑问。假设代替工资,该列的尺寸为600 * 200(长*宽),在这种情况下我如何找到平均值? Ram 600 * 200 Hari 700 * 300等等...... – akrockz

你可以做这样的事情:

val salary = sc.textFile("File1.csv").map(_.split(",").map(_.trim)) 
val location = sc.textFile("File2.csv").map(_.split(",").map(_.trim)) 
val joined = salary.map(e=>(e(0),e(3).toInt)).join(location.map(e=>(e(0),e(1)))) 
val locSalary = joined.map(v => (v._2._2, v._2._1)) 
val averages = locSalary.aggregateByKey((0,0))((t,e) => (t._1 + 1, t._2 + e), 
     (t1,t2) => (t1._1 + t2._1, t1._2 + t2._2)).mapValues(t => t._2/t._1) 

然后averages.take(10)会给:

res5: Array[(String, Int)] = Array((Chennai,50000), (Bangalore,40000)) 
+0

感谢您的答复。假设而不是工资,该列的尺寸为600 * 200(长度*宽度),在这种情况下我如何找到平均值? Ram 600 * 200 Hari 700 * 300 等等...... – akrockz

+0

尺寸是否以字符串形式给出?你想平均面积(长度乘以宽度)还是平均每个维度? – Harald

+0

我想有这些尺寸的每个平均值,按位置组。 – akrockz

我会用dataframes: 首先阅读dataframes如:

val salary = spark.read.option("header", "true").csv("File1.csv") 
val location = spark.read.option("header", "true").csv("File2.csv") 

如果您没有标题,则需要将选项设置为“false”并使用withColumnRenamed更改默认名称。

val salary = spark.read.option("header", "false").csv("File1.csv").toDF("name", "age", "job", "salary") 
val location = spark.read.option("header", "false").csv("File2.csv").toDF("name", "location") 

现在要做的加入:

val joined = salary.join(location, "name") 

最后做平均计算:

val avg = joined.groupby("location").agg(avg($"salary")) 

节省做:

avg.repartition(1).write.csv("output.csv") 
+0

感谢您的答复。假设而不是工资,该列的尺寸为600 * 200(长度*宽度),在这种情况下我如何找到平均值? Ram 600 * 200 Hari 700 * 300等等...... – akrockz

+0

你是什么意思?你的意思是每个名称有多个列,每个名称有多个列? –

可以读取CSV文件作为DataFrames,然后加入和他们组得到的平均值:

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
    "name", "age", "title", "salary" 
) 

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
    "name", "location" 
) 

import org.apache.spark.sql.functions._ 

val dfAverage = df1.join(df2, Seq("name")). 
    groupBy(df2("location")).agg(avg(df1("salary")).as("average")). 
    select("location", "average") 

dfAverage.show 
+-----------+-------+ 
| location|average| 
+-----------+-------+ 
|Bangalore |40000.0| 
| Chennai |50000.0| 
+-----------+-------+ 

[更新]计算平均尺寸:

// file1.csv: 
Ram,30,Engineer,40000,600*200 
Bala,27,Doctor,30000,800*400 
Hari,33,Engineer,50000,700*300 
Siva,35,Doctor,60000,600*200 

// file2.csv 
Hari,Bangalore 
Ram,Chennai 
Bala,Bangalore 
Siva,Chennai 

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
    "name", "age", "title", "salary", "dimensions" 
) 

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
    "name", "location" 
) 

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.IntegerType 

val dfAverage = df1.join(df2, Seq("name")). 
    groupBy(df2("location")). 
    agg(
    avg(split(df1("dimensions"), ("\\*")).getItem(0).cast(IntegerType)).as("avg_length"), 
    avg(split(df1("dimensions"), ("\\*")).getItem(1).cast(IntegerType)).as("avg_width") 
). 
    select(
    $"location", $"avg_length", $"avg_width", 
    concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions") 
) 

dfAverage.show 
+---------+----------+---------+--------------+ 
| location|avg_length|avg_width|avg_dimensions| 
+---------+----------+---------+--------------+ 
|Bangalore|  750.0| 350.0| 750.0*350.0| 
| Chennai|  600.0| 200.0| 600.0*200.0| 
+---------+----------+---------+--------------+ 
+0

感谢您的回复。假设而不是工资,该列的尺寸为600 * 200(长度*宽度),在这种情况下我如何找到平均值?拉姆600 * 200哈里700 * 300等...... – akrockz

+0

@akrockz,请参阅扩展答案。 –

+0

非常感谢@Leo C ..这就是我一直在寻找..最后的一个请求..目前我没有在我的笔记本电脑中安装Spark ..如果我向你发送邮件,是否可以向我发送输出输入数据 ? 对不起,要求太多..谢谢 – akrockz