Scala加入withCassandra表结果(或CassandraTableScanRDD)到数据集

问题描述:

我使用Datastax spark-cassandra-connector访问Cassandra中的一些数据。Scala加入withCassandra表结果(或CassandraTableScanRDD)到数据集

为了能够有效地访问我需要查询的所有数据,我必须使用joinWithCassandraTable方法从一堆分区中取回数据。这给了我一个类com.datastax.spark.connector.rdd.CassandraTableScanRDD(或类似的,测试我实际上只是使用标准的sc.cassandraTable(ks, tbl)方法来读取数据)的对象。

问题是,我需要在结果对象上使用的所有方法都需要类org.apache.spark.sql.Dataset的对象。

我已经做了很多搜索,并且一直没能找到任何帮助 - 我发现的最接近的是this类似的问题,我不认为它已经得到了充分的回答,因为它忽略了使用情况下,访问所有必要数据的推荐方法是使用joinWithCassandraTable

我也是新来的java和斯卡拉,所以对不起,如果我有点慢。任何帮助都会受到极大的赞赏,因为我在这一点上很困难。

感谢, AKHIL

你可以做的是读你的RDD到RDD [行],然后更改成数据帧。我们唯一的问题是我们也需要Schema。所以让我们分两步来做。

首先让我们结合目标

val schema = spark.read.cassandraFormat("dogabase", "test").load.schema 

/** 
schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(owner,StringType,true), 
StructField(dog_id,IntegerType,true), 
StructField(dog_age,IntegerType,true), 
StructField(dog_name,StringType,true)) 
**/ 

获取架构编程然后我们就可以让org.apache.spark.sql.Row对象我们卡桑德拉驱动 行。

import org.apache.spark.sql.Row 
val joinResult = 
    sc.parallelize(Seq(Tuple1("Russ"))) 
    .joinWithCassandraTable("test", "dogabase") 
    .map{ case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)} //Unpack our Cassandra row values into a spark.sql.Row 

现在,我们有一个架构和RDD [行]我们可以用火花会议

val dataset = spark.createDataFrame(joinResult, schema) 
dataset.show 

/** 
+-----+------+-------+--------+ 
|owner|dog_id|dog_age|dog_name| 
+-----+------+-------+--------+ 
| Russ|  1|  10| cara| 
| Russ|  2|  11|sundance| 
+-----+------+-------+--------+ 
**/ 

而只是柜面你不相信我的createDataFrame方法,一个数据帧是数据集

dataset.getClass 
Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.Dataset 

编辑:可能需要的转换器

一些卡桑德拉类型无效b Spark行的asis,所以你可能需要转换它们。这可以通过编写快速转换功能来完成。不幸的是,SCC使用的内置转换使内部表示成为可能,因此我们无法使用这些转换。

def convertToSpark(element:Any): Any = { 
    case time: org.joda.time.LocalDate => time.toDateTimeAtStartOfDay().toDate //Convert to java.util.Date 
    case other => other 
} 

使你行

cassandraRow.columnValues.map(convertToSpark):_* 
+0

这是梦幻般的,也解决了一堆我一直有其他问题时,则...太感谢你了!我会明天实施这个,并会让你知道我是如何得到的:) –

+0

抱歉再次打扰你 - 这似乎是非常接近工作,除了我的卡桑德拉领域之一是日期,我看到的例外'编码时出错:java.lang.RuntimeException:org.joda.time.LocalDate不是日期模式的有效外部类型。 Do'u知道这是否有明显的修复我失踪?再次感谢 –

+0

哦星火:)问题是cassandra驱动返回的类型“joda的localdate”与Spark不兼容。所以你需要做的就是将这些LocalDate转换为spark兼容类型。我建议您使用内置转换器的连接器,但这些连接器的目标是内部表示,并且也不允许用于外部源。我将在上面的答案中提供一个转换类型的代码示例。 – RussS