如何使用spark组数据集
问题描述:
我正在使用Spark Dataset(Spark 1.6.1版本)。 下面是我的代码如何使用spark组数据集
object App {
val conf = new SparkConf()
.setMaster("local")
.setAppName("SparkETL")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._
}
override def readDataTable(tableName:String):DataFrame={
val dataFrame= App.sqlContext.read.jdbc(JDBC_URL, tableName, JDBC_PROP);
return dataFrame;
}
case class Student(stud_id , sname , saddress)
case class Student(classid, stud_id, name)
var tbl_student = JobSqlDAO.readDataTable("tbl_student").filter("stud_id = '" + studId + "'").as[Student].as("tbl_student")
var tbl_class_student = JobSqlDAO.readDataTable("tbl_class_student").as[StudentClass].as("tbl_class_student")
var result = tbl_class_student.joinWith(tbl_student, $"tbl_student.stud_id" === $"tbl_class_student.stud_id").as("ff")
现在我想BY子句对多个列执行组? 如何做到这一点? result.groupBy(_._1._1.created_at)
这样我可以做到吗? 如果是的话,那么我不能看到作为一个组的结果也是如何在多列上做到这一点?
答
如果我已经正确理解了您的要求,那么您最好的选择是在PairRDDFunctions类中使用reduceByKey
函数。
函数的签名是,它只是表示您使用一系列键/值对。
让我解释一下工作流程:
- 你找回你MANT与之合作的集(在你的代码:
result
) - 随着RDD
map
功能拆分的结果包含两个元组集(例如:result.map(row => ((row.key1, row.key2), (row.value1, row.value2))
) - 现在你有一个RDD [(K,V)],其中类型K是键字段元组的类型,V是类型值字段元组
- 您可以直接使用
reduceByKey
通过传递(V,V) => V
类型的函数,聚合值(例如:(agg: (Int, Int), val: (Int, Int)) => (agg._1 + val._1, agg._2 + val._2)
)
请注意:
- 你必须从聚合函数返回相同的值类型
- 您必须导入
org.apache.spark.SparkContext._
才能自动使用PairRDDFunctions实用功能 - 与
groupBy
相同的推理,您必须从第e启动RDD到一对RDD[K,V]
,但是您没有聚合函数,因为您只是将值存储在seq中以用于进一步计算 - 如果您需要聚合的起始值(例如:用于计数),请改用
foldByKey
功能