我正在Spark應用程序和Mongo控制檯上運行相同的聚合管道。在控制檯上,數據在一眨眼之間就被提取出來了,只需要第二次使用「it」來檢索所有預期的數據。 根據Spark WebUI,Spark應用程序需要將近兩分鐘的時間。MongoDB Spark連接器 - 聚合速度慢
正如你所看到的,242級的任務正在推出,以獲取結果。我不確定爲什麼會啓動這麼大量的任務,而MongoDB彙總只返回40個文檔。它看起來有很高的開銷。
我的Mongos控制檯上運行查詢:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
的Spark應用程序代碼
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
在那之後,我用hdfs dfs -getmerge /user/spark/output/ output.csv
並比較結果。
聚合爲什麼這麼慢?是不是撥打withPipeline
意味着減少需要傳輸到Spark的數據量?它看起來不像Mongo控制檯那樣進行相同的聚合。在Mongos控制檯上,它正在快速發展。我正在使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0。
編輯:我想知道的另一件事是兩個執行程序啓動(因爲我使用默認執行設置atm),但只有一個執行程序完成所有工作。爲什麼不是第二個執行者做任何工作?
編輯2:當使用不同的聚合管道,並呼籲.count()
代替saveAsTextFile(..)
,也有正在創建242個任務。這次將返回65.000個文件。
我會更多地關注用戶界面,試圖瞭解242個任務是什麼。有了40個文件,我想可以將它們放在一個分區中。 – Ross
@Ross當我運行一個不同的查詢和'.count()''aggregatedRdd'而不是將其保存到hdfs時,還會創建242個任務。不同的查詢返回幾百萬個文檔。我的收集統計數據是:'數據:15.01GiB文檔:45141000大塊:443'。我懷疑寫入HDFS是個問題。這只是我的Spark應用程序中調用的唯一操作,這就是爲什麼它被列爲Web UI中唯一的階段。還是我誤會了? – j9dy
@Ross我總覺得沒有執行聚合管道。我是否必須專門執行聚合管道? – j9dy