使用Spark Scala计算平均值
如何计算Spark Scala中每个位置的平均工资低于两个数据集?使用Spark Scala计算平均值
File1.csv(第4栏是薪金)
Ram, 30, Engineer, 40000
Bala, 27, Doctor, 30000
Hari, 33, Engineer, 50000
Siva, 35, Doctor, 60000
File2.csv(第2栏是位置)
Hari, Bangalore
Ram, Chennai
Bala, Bangalore
Siva, Chennai
上述文件不被排序。需要加入这两个文件,并找到每个位置的平均工资。我试着用下面的代码,但无法做到。
val salary = sc.textFile("File1.csv").map(e => e.split(","))
val location = sc.textFile("File2.csv").map(e.split(","))
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1)))
val joinedData = joined.sortByKey()
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2))
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))
aggregatedDF.repartition(1).saveAsTextFile("output.txt")
请帮忙看看它的代码和示例输出。
非常感谢
我会用数据帧API,这应该工作:
val salary = sc.textFile("File1.csv")
.map(e => e.split(","))
.map{case Seq(name,_,_,salary) => (name,salary)}
.toDF("name","salary")
val location = sc.textFile("File2.csv")
.map(e => e.split(","))
.map{case Seq(name,location) => (name,location)}
.toDF("name","location")
import org.apache.spark.sql.functions._
salary
.join(location,Seq("name"))
.groupBy($"location")
.agg(
avg($"salary").as("avg_salary")
)
.repartition(1)
.write.csv("output.csv")
你可以做这样的事情:
val salary = sc.textFile("File1.csv").map(_.split(",").map(_.trim))
val location = sc.textFile("File2.csv").map(_.split(",").map(_.trim))
val joined = salary.map(e=>(e(0),e(3).toInt)).join(location.map(e=>(e(0),e(1))))
val locSalary = joined.map(v => (v._2._2, v._2._1))
val averages = locSalary.aggregateByKey((0,0))((t,e) => (t._1 + 1, t._2 + e),
(t1,t2) => (t1._1 + t2._1, t1._2 + t2._2)).mapValues(t => t._2/t._1)
然后averages.take(10)
会给:
res5: Array[(String, Int)] = Array((Chennai,50000), (Bangalore,40000))
我会用dataframes: 首先阅读dataframes如:
val salary = spark.read.option("header", "true").csv("File1.csv")
val location = spark.read.option("header", "true").csv("File2.csv")
如果您没有标题,则需要将选项设置为“false”并使用withColumnRenamed更改默认名称。
val salary = spark.read.option("header", "false").csv("File1.csv").toDF("name", "age", "job", "salary")
val location = spark.read.option("header", "false").csv("File2.csv").toDF("name", "location")
现在要做的加入:
val joined = salary.join(location, "name")
最后做平均计算:
val avg = joined.groupby("location").agg(avg($"salary"))
节省做:
avg.repartition(1).write.csv("output.csv")
感谢您的答复。假设而不是工资,该列的尺寸为600 * 200(长度*宽度),在这种情况下我如何找到平均值? Ram 600 * 200 Hari 700 * 300等等...... – akrockz
你是什么意思?你的意思是每个名称有多个列,每个名称有多个列? –
可以读取CSV文件作为DataFrames,然后加入和他们组得到的平均值:
val df1 = spark.read.csv("/path/to/file1.csv").toDF(
"name", "age", "title", "salary"
)
val df2 = spark.read.csv("/path/to/file2.csv").toDF(
"name", "location"
)
import org.apache.spark.sql.functions._
val dfAverage = df1.join(df2, Seq("name")).
groupBy(df2("location")).agg(avg(df1("salary")).as("average")).
select("location", "average")
dfAverage.show
+-----------+-------+
| location|average|
+-----------+-------+
|Bangalore |40000.0|
| Chennai |50000.0|
+-----------+-------+
[更新]计算平均尺寸:
// file1.csv:
Ram,30,Engineer,40000,600*200
Bala,27,Doctor,30000,800*400
Hari,33,Engineer,50000,700*300
Siva,35,Doctor,60000,600*200
// file2.csv
Hari,Bangalore
Ram,Chennai
Bala,Bangalore
Siva,Chennai
val df1 = spark.read.csv("/path/to/file1.csv").toDF(
"name", "age", "title", "salary", "dimensions"
)
val df2 = spark.read.csv("/path/to/file2.csv").toDF(
"name", "location"
)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
val dfAverage = df1.join(df2, Seq("name")).
groupBy(df2("location")).
agg(
avg(split(df1("dimensions"), ("\\*")).getItem(0).cast(IntegerType)).as("avg_length"),
avg(split(df1("dimensions"), ("\\*")).getItem(1).cast(IntegerType)).as("avg_width")
).
select(
$"location", $"avg_length", $"avg_width",
concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions")
)
dfAverage.show
+---------+----------+---------+--------------+
| location|avg_length|avg_width|avg_dimensions|
+---------+----------+---------+--------------+
|Bangalore| 750.0| 350.0| 750.0*350.0|
| Chennai| 600.0| 200.0| 600.0*200.0|
+---------+----------+---------+--------------+
那么这里的最终输出如下所示? + ------------------------ + |位置| avg_salary | + ------------------------ + |班加罗尔| 40000 | |钦奈| 500000 | + ------------------------ + – akrockz
还有一个疑问。假设代替工资,该列的尺寸为600 * 200(长*宽),在这种情况下我如何找到平均值? Ram 600 * 200 Hari 700 * 300等等...... – akrockz