show()/ count()永远不会完成while show()快速运行
问题描述:
我在本地运行Spark并且出现了一个奇怪的问题。基本上,我可以使用DataFrame的show()方法输出任意数量的行,但是,当我尝试使用count()或collect()(甚至是很少量的数据)时,Spark就会停留在该阶段。永远不会完成它的工作。我使用gradle来构建和运行。show()/ count()永远不会完成while show()快速运行
当我运行
./gradlew clean run
程序卡住在
> Building 83% > :run
什么会导致这个问题? 这是代码。
val moviesRatingsDF = MongoSpark.load(sc).toDF().select("movieId", "userId","rating")
val movieRatingsDF = moviesRatingsDF
.groupBy("movieId")
.pivot("userId")
.max("rating")
.na.fill(0)
val ratingColumns = movieRatingsDF.columns.drop(1) // drop the name column
val movieRatingsDS:Dataset[MovieRatingsVector] = movieRatingsDF
.select(col("movieId").as("movie_id"), array(ratingColumns.map(x => col(x)): _*).as("ratings"))
.as[MovieRatingsVector]
val moviePairs = movieRatingsDS.withColumnRenamed("ratings", "ratings1")
.withColumnRenamed("movie_id", "movie_id1")
.crossJoin(movieRatingsDS.withColumnRenamed("ratings", "ratings2").withColumnRenamed("movie_id", "movie_id2"))
.filter(col("movie_id1") < col("movie_id2"))
val movieSimilarities = moviePairs.map(row => {
val ratings1 = sc.parallelize(row.getAs[Seq[Double]]("ratings1"))
val ratings2 = sc.parallelize(row.getAs[Seq[Double]]("ratings2"))
val corr:Double = Statistics.corr(ratings1, ratings2)
MovieSimilarity(row.getAs[Long]("movie_id1"), row.getAs[Long]("movie_id2"), corr)
}).cache()
val collectedData = movieSimilarities.collect()
println(collectedData.length)
log.warn("I'm done") //never gets here
close
答
星火确实懒惰的评价,并创建RDD/DF的时候一个动作被调用。
要回答你的问题
1。在收集/计数你调用两个不同的动作,柜面如果你不 坚持数据,这将导致RDD/DF进行重新评估,因此 比预期更多的时间。
- 在显示中只有一个动作。它只显示前1000行(手指交叉 ),因此它结束
我可能在这里是错的,但我认为你不应该在'moviePairs'数据集的转换中创建新的RDD? (我指的是两个'sc.parallelize(。)') –
我也觉得它有点低效。但是Statistics.corr方法需要一对RDD。如果我有办法将2个向量传递给它,会很好。但这种情况并非如此。无论如何,这个任务看起来很好(如果我之后打印一些东西 - 它不会花费很长时间)。 –