如何更改Spark数据集上的模式

问题描述:

当我使用select语句在Spark 2中检索数据集时,基础列会继承查询列的数据类型。如何更改Spark数据集上的模式

val ds1 = spark.sql("select 1 as a, 2 as b, 'abd' as c") 

ds1.printSchema() 
root 
|-- a: integer (nullable = false) 
|-- b: integer (nullable = false) 
|-- c: string (nullable = false) 

现在,如果我将它转换为case类,它将正确转换值,但底层架构仍然是错误的。

case class abc(a: Double, b: Double, c: String) 
val ds2 = ds1.as[abc] 
ds2.printSchema() 
root 
|-- a: integer (nullable = false) 
|-- b: integer (nullable = false) 
|-- c: string (nullable = false) 

ds2.collect 
res18: Array[abc] = Array(abc(1.0,2.0,abd)) 

我“应该”可以指定编码器,当我创建第二个数据集,但斯卡拉似乎忽略此参数来使用(这是一个BUG?):

val abc_enc = org.apache.spark.sql.Encoders.product[abc] 

val ds2 = ds1.as[abc](abc_enc) 

ds2.printSchema 
root 
|-- a: integer (nullable = false) 
|-- b: integer (nullable = false) 
|-- c: string (nullable = false) 

所以只有这样我才能看到这样做,简单地说,没有非常复杂的映射就是使用createDataset,但是这需要在底层对象上进行收集,所以它并不理想。

val ds2 = spark.createDataset(ds1.as[abc].collect) 

您可以简单地使用上columnscast方法

import sqlContext.implicits._ 
val ds2 = ds1.select($"a".cast(DoubleType), $"a".cast(DoubleType), $"c") 
ds2.printSchema() 

你应该有

root 
|-- a: double (nullable = false) 
|-- a: double (nullable = false) 
|-- c: string (nullable = false) 

你也可以转换列,同时使用SQL查询选择如下

import spark.implicits._ 

val ds = Seq((1,2,"abc"),(1,2,"abc")).toDF("a", "b","c").createOrReplaceTempView("temp") 

val ds1 = spark.sql("select cast(a as Double) , cast (b as Double), c from temp") 

ds1.printSchema() 

这有模式作为

root 
|-- a: double (nullable = false) 
|-- b: double (nullable = false) 
|-- c: string (nullable = true) 

现在你可以转换为数据集与案例类

case class abc(a: Double, b: Double, c: String) 

val ds2 = ds1.as[abc] 
ds2.printSchema() 

现在有需要的架构

root 
|-- a: double (nullable = false) 
|-- b: double (nullable = false) 
|-- c: string (nullable = true) 

希望这有助于!

好的,我想我已经以更好的方式解决了这个问题。

当我们创建一个新的数据集时,我们可以引用数据集的rdd而不是使用collect。

所以不是

val ds2 = spark.createDataset(ds1.as[abc].collect) 

我们使用:

val ds2 = spark.createDataset(ds1.as[abc].rdd) 

ds2.printSchema 
root 
|-- a: double (nullable = false) 
|-- b: double (nullable = false) 
|-- c: string (nullable = true) 

这使懒惰的评估不变,但允许新的数据集采用编码器为ABC的情况下类,以及随后的架构当我们用它来创建一个新表格时会反映这一点。

这是Spark API开放的问题(检查这张票SPARK-17694

所以,你需要做的是做一个额外的显式类型转换。像这样的东西应该工作:

ds1.as[abc].map(x => x : abc)