星火斯卡拉:转换任意N列到地图
问题描述:
我代表电影IDS(第一列)和评级为不同的用户对于电影中的其余列如下数据结构 - 这样的事情:星火斯卡拉:转换任意N列到地图
+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|movieId| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15|
+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| 1580|null|null| 3.5| 5.0|null|null|null|null|null|null|null|null|null|null|null|
| 3175|null|null|null|null|null|null|null|null|null|null|null|null|null| 5.0|null|
| 3794|null|null|null|null|null|null|null|null|null|null|null| 3.0|null|null|null|
| 2659|null|null|null| 3.0|null|null|null|null|null|null|null|null|null|null|null|
我想这个数据帧因此,它会像
转换成的final case class MovieRatings(movie_id: Long, ratings: Map[Long, Double])
一个DataSet
[1580, [1 -> null, 2 -> null, 3 -> 3.5, 4 -> 5.0, 5 -> null, 6 -> null, 7 -> null,...]]
等等
这可怎么办呢?
这里的事情是用户数量是任意的。我想将这些压缩到一个列中,并保持第一列不变。
答
首先,你必须要变换分析你的数据帧到一个与模式匹配您的案例类,那么你可以使用.as[MovieRatings]
到数据帧转换为Dataset[MovieRatings]
:
import org.apache.spark.sql.functions._
import spark.implicits._
// define a new MapType column using `functions.map`, passing a flattened-list of
// column name (as a Long column) and column value
val mapColumn: Column = map(df.columns.tail.flatMap(name => Seq(lit(name.toLong), $"$name")): _*)
// select movie id and map column with names matching the case class, and convert to Dataset:
df.select($"movieId" as "movie_id", mapColumn as "ratings")
.as[MovieRatings]
.show(false)
答
可以使用spark.sql.functions .map从任意列创建地图。它期望在键和值之间交替的序列可以是列类型或字符串。下面是一个例子:
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions
case class Input(movieId: Int, a: Option[Double], b: Option[Double], c: Option[Double])
val data = Input(1, None, Option(3.5), Option(1.4)) ::
Input(2, Option(4.2), Option(1.34), None) ::
Input(3, Option(1.11), None, Option(3.32)) :: Nil
val df = sc.parallelize(data).toDF
// Exclude the PK column from the map
val mapKeys = df.columns.filterNot(_ == "movieId")
// Build the sequence of key, value, key, value, ..
val pairs = mapKeys.map(k => Seq(lit(k), col(k))).flatten
val mapped = df.select($"movieId", functions.map(pairs:_*) as "map")
mapped.show(false)
产生这样的输出:
+-------+------------------------------------+
|movieId|map |
+-------+------------------------------------+
|1 |Map(a -> null, b -> 3.5, c -> 1.4) |
|2 |Map(a -> 4.2, b -> 1.34, c -> null) |
|3 |Map(a -> 1.11, b -> null, c -> 3.32)|
+-------+------------------------------------+
的可能的复制[火花2.0 - 转换数据帧到数据集(https://*.com/questions/40700213/spark-2 -0-convert-dataframe-to-dataset) – Pavel
我不认为这是重复的,因为这个问题是我该怎么做,这个问题是我正在尝试这样做,它不工作,噢等待,我需要升级Spark。这个问题是要求一个教程,因此是脱离主题。 – jmarkmurphy