从CSV文件创建Spark数据集
问题描述:
我想从简单的CSV文件创建Spark数据集。下面是CSV文件的内容:从CSV文件创建Spark数据集
name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"
这里是使数据集的代码:
var location = "s3a://path_to_csv"
case class City(name: String, state: String, number_of_people: Long)
val cities = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv(location)
.as[City]
以下是错误消息:“不能达到投number_of_people
从字符串到BIGINT,因为它可能截断“
Databricks谈论如何在this blog post中创建数据集和此特定错误消息。
编码器急切地检查你的数据预期的架构, 匹配提供错误信息帮助你试图将数据的错误 过程的TB之前。例如,如果我们尝试使用 太小的数据类型,例如转换为对象会导致 截断(即numStudents大于一个字节,其中最大值为255),分析器将发出一个数据类型为 AnalysisException。
我使用的是Long
类型,所以我没想到会看到这个错误信息。
答
使用架构推断:
val cities = spark.read
.option("inferSchema", "true")
...
或提供模式:
val cities = spark.read
.schema(StructType(Array(StructField("name", StringType), ...)
或投:
val cities = spark.read
.option("header", "true")
.csv(location)
.withColumn("number_of_people", col("number_of_people").cast(LongType))
.as[City]
答
与案例类城市(名称:字符串,状态:字符串, number_of_people:长), 你只需要一条线
private val cityEncoder = Seq(City("", "", 0)).toDS
,那么你的代码
val cities = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv(location)
.as[City]
将只是工作。
这是,得自这个网站的官方消息[http://spark.apache.org/docs/latest/sql-programming-guide.html#overview][1]