Scala加入withCassandra表结果(或CassandraTableScanRDD)到数据集
我使用Datastax spark-cassandra-connector
访问Cassandra中的一些数据。Scala加入withCassandra表结果(或CassandraTableScanRDD)到数据集
为了能够有效地访问我需要查询的所有数据,我必须使用joinWithCassandraTable
方法从一堆分区中取回数据。这给了我一个类com.datastax.spark.connector.rdd.CassandraTableScanRDD
(或类似的,测试我实际上只是使用标准的sc.cassandraTable(ks, tbl)
方法来读取数据)的对象。
问题是,我需要在结果对象上使用的所有方法都需要类org.apache.spark.sql.Dataset
的对象。
我已经做了很多搜索,并且一直没能找到任何帮助 - 我发现的最接近的是this类似的问题,我不认为它已经得到了充分的回答,因为它忽略了使用情况下,访问所有必要数据的推荐方法是使用joinWithCassandraTable
。
我也是新来的java和斯卡拉,所以对不起,如果我有点慢。任何帮助都会受到极大的赞赏,因为我在这一点上很困难。
感谢, AKHIL
你可以做的是读你的RDD到RDD [行],然后更改成数据帧。我们唯一的问题是我们也需要Schema。所以让我们分两步来做。
首先让我们结合目标
val schema = spark.read.cassandraFormat("dogabase", "test").load.schema
/**
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(owner,StringType,true),
StructField(dog_id,IntegerType,true),
StructField(dog_age,IntegerType,true),
StructField(dog_name,StringType,true))
**/
获取架构编程然后我们就可以让org.apache.spark.sql.Row
对象我们卡桑德拉驱动 行。
import org.apache.spark.sql.Row
val joinResult =
sc.parallelize(Seq(Tuple1("Russ")))
.joinWithCassandraTable("test", "dogabase")
.map{ case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)} //Unpack our Cassandra row values into a spark.sql.Row
现在,我们有一个架构和RDD [行]我们可以用火花会议
val dataset = spark.createDataFrame(joinResult, schema)
dataset.show
/**
+-----+------+-------+--------+
|owner|dog_id|dog_age|dog_name|
+-----+------+-------+--------+
| Russ| 1| 10| cara|
| Russ| 2| 11|sundance|
+-----+------+-------+--------+
**/
而只是柜面你不相信我的createDataFrame方法,一个数据帧是数据集
dataset.getClass
Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.Dataset
编辑:可能需要的转换器
一些卡桑德拉类型无效b Spark行的asis,所以你可能需要转换它们。这可以通过编写快速转换功能来完成。不幸的是,SCC使用的内置转换使内部表示成为可能,因此我们无法使用这些转换。
def convertToSpark(element:Any): Any = {
case time: org.joda.time.LocalDate => time.toDateTimeAtStartOfDay().toDate //Convert to java.util.Date
case other => other
}
使你行
cassandraRow.columnValues.map(convertToSpark):_*
这是梦幻般的,也解决了一堆我一直有其他问题时,则...太感谢你了!我会明天实施这个,并会让你知道我是如何得到的:) –
抱歉再次打扰你 - 这似乎是非常接近工作,除了我的卡桑德拉领域之一是日期,我看到的例外'编码时出错:java.lang.RuntimeException:org.joda.time.LocalDate不是日期模式的有效外部类型。 Do'u知道这是否有明显的修复我失踪?再次感谢 –
哦星火:)问题是cassandra驱动返回的类型“joda的localdate”与Spark不兼容。所以你需要做的就是将这些LocalDate转换为spark兼容类型。我建议您使用内置转换器的连接器,但这些连接器的目标是内部表示,并且也不允许用于外部源。我将在上面的答案中提供一个转换类型的代码示例。 – RussS