MongoDB Spark连接器 - 聚合速度慢
我正在Spark应用程序和Mongo控制台上运行相同的聚合管道。在控制台上,数据在一眨眼之间就被提取出来了,只需要第二次使用“it”来检索所有预期的数据。 根据Spark WebUI,Spark应用程序需要将近两分钟的时间。MongoDB Spark连接器 - 聚合速度慢
正如你所看到的,242级的任务正在推出,以获取结果。我不确定为什么会启动这么大量的任务,而MongoDB汇总只返回40个文档。它看起来有很高的开销。
我的Mongos控制台上运行查询:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
的Spark应用程序代码
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
在那之后,我用hdfs dfs -getmerge /user/spark/output/ output.csv
并比较结果。
聚合为什么这么慢?是不是拨打withPipeline
意味着减少需要传输到Spark的数据量?它看起来不像Mongo控制台那样进行相同的聚合。在Mongos控制台上,它正在快速发展。我正在使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0。
编辑:我想知道的另一件事是两个执行程序启动(因为我使用默认执行设置atm),但只有一个执行程序完成所有工作。为什么不是第二个执行者做任何工作?
编辑2:当使用不同的聚合管道,并呼吁.count()
代替saveAsTextFile(..)
,也有正在创建242个任务。这次将返回65.000个文件。
大量的任务是由默认的Mongo Spark分区策略引起的。计算分区时它忽略聚合管道,主要有两个原因:
- 它减少了计算分区的成本
- 确保对所有分片和非碎片化partitioners
相同的行为。然而,因为你发现他们可以生成空的分区,在你的情况是昂贵的。
用于固定的选择可能是:
-
更改分区策略
对于选择其他分区,以减少分区的数量。例如,PaginateByCount会将数据库拆分成一组数量的分区。
创建您自己的分区器 - 只需实现特性,您就可以应用聚合管道并对结果进行分区。举例来说,请参阅HalfwayPartitioner和custom partitioner test。
Pre使用$ out将结果集合到一个集合中并从那里读取。
- 使用
coalesce(N)
将分区合并在一起并减少分区数量。 - 增加
spark.mongodb.input.partitionerOptions.partitionSizeMB
配置以产生更少的分区。
自定义分区程序应该会产生最好的解决方案,但是有办法更好地使用可用的默认分区程序。
如果您认为应该有一个使用聚合管道计算分区的默认分区程序,那么请向MongoDB Spark Jira project添加一张标签。
我可以使用'MongoShardedPartitioner'作为具有哈希分片的集合吗?文档中说'shardkey - 该字段应该被索引并包含唯一值.'在我的情况下,我的字段log_file_name:day_of_timestamp:hour_of_timestamp有一个合并的分区键,从而将相关数据保存在一起 - 至少是I希望这样做。但预值散列值不是唯一的。文档是否讨论了哈希值?另外,我还有一个关于如何在聊天中使用MongoSpark进行多个查询的小问题 - 如果你介意看看它。 – j9dy
我会更多地关注用户界面,试图了解242个任务是什么。有了40个文件,我想可以将它们放在一个分区中。 – Ross
@Ross当我运行一个不同的查询和'.count()''aggregatedRdd'而不是将其保存到hdfs时,还会创建242个任务。不同的查询返回几百万个文档。我的收集统计数据是:'数据:15.01GiB文档:45141000大块:443'。我怀疑写入HDFS是个问题。这只是我的Spark应用程序中调用的唯一操作,这就是为什么它被列为Web UI中唯一的阶段。还是我误会了? – j9dy
@Ross我总觉得没有执行聚合管道。我是否必须专门执行聚合管道? – j9dy