2016-11-04 120 views
3

我正在Spark應用程序和Mongo控制檯上運行相同的聚合管道。在控制檯上,數據在一眨眼之間就被提取出來了,只需要第二次使用「it」來檢索所有預期的數據。 根據Spark WebUI,Spark應用程序需要將近兩分鐘的時間。MongoDB Spark連接器 - 聚合速度慢

enter image description here

正如你所看到的,242級的任務正在推出,以獲取結果。我不確定爲什麼會啓動這麼大量的任務,而MongoDB彙總只返回40個文檔。它看起來有很高的開銷。

我的Mongos控制檯上運行查詢:

db.data.aggregate([ 
    { 
     $match:{ 
     signals:{ 
      $elemMatch:{ 
       signal:"SomeSignal", 
       value:{ 
        $gt:0, 
        $lte:100 
       } 
      } 
     } 
     } 
    }, 
    { 
     $group:{ 
     _id:"$root_document", 
     firstTimestamp:{ 
      $min:"$ts" 
     }, 
     lastTimestamp:{ 
      $max:"$ts" 
     }, 
     count:{ 
      $sum:1 
     } 
     } 
    } 
]) 

的Spark應用程序代碼

JavaMongoRDD<Document> rdd = MongoSpark.load(sc); 

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
      Document.parse(
        "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"), 
      Document.parse(
        "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }"))); 

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() { 
     @Override 
     public String call(Document arg0) throws Exception { 
      String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(), 
        arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(), 
        arg0.get("count").toString()); 
      return output; 
     } 
    }); 

    outputRdd.saveAsTextFile("/user/spark/output"); 

在那之後,我用hdfs dfs -getmerge /user/spark/output/ output.csv並比較結果。

聚合爲什麼這麼慢?是不是撥打withPipeline意味着減少需要傳輸到Spark的數據量?它看起來不像Mongo控制檯那樣進行相同的聚合。在Mongos控制檯上,它正在快速發展。我正在使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0。

編輯:我想知道的另一件事是兩個執行程序啓動(因爲我使用默認執行設置atm),但只有一個執行程序完成所有工作。爲什麼不是第二個執行者做任何工作?

enter image description here

編輯2:當使用不同的聚合管道,並呼籲.count()代替saveAsTextFile(..),也有正在創建242個任務。這次將返回65.000個文件。 enter image description here

+1

我會更多地關注用戶界面,試圖瞭解242個任務是什麼。有了40個文件,我想可以將它們放在一個分區中。 – Ross

+0

@Ross當我運行一個不同的查詢和'.count()''aggregatedRdd'而不是將其保存到hdfs時,還會創建242個任務。不同的查詢返回幾百萬個文檔。我的收集統計數據是:'數據:15.01GiB文檔:45141000大塊:443'。我懷疑寫入HDFS是個問題。這只是我的Spark應用程序中調用的唯一操作,這就是爲什麼它被列爲Web UI中唯一的階段。還是我誤會了? – j9dy

+0

@Ross我總覺得沒有執行聚合管道。我是否必須專門執行聚合管道? – j9dy

回答

2

大量的任務是由默認的Mongo Spark分區策略引起的。計算分區時它忽略聚合管道,主要有兩個原因:

  1. 它減少了計算分區的成本
  2. 確保對所有分片和非碎片化partitioners

相同的行爲。然而,因爲你發現他們可以生成空的分區,在你的情況是昂貴的。

用於固定的選擇可能是:

  1. 更改分區策略

    對於選擇其他分區,以減少分區的數量。例如,PaginateByCount會將數據庫拆分成一組數量的分區。

    創建您自己的分區器 - 只需實現特性,您就可以應用聚合管道並對結果進行分區。舉例來說,請參閱HalfwayPartitionercustom partitioner test

  2. Pre使用$ out將結果集合到一個集合中並從那裏讀取。

  3. 使用coalesce(N)將分區合併在一起並減少分區數量。
  4. 增加spark.mongodb.input.partitionerOptions.partitionSizeMB配置以產生更少的分區。

自定義分區程序應該會產生最好的解決方案,但是有辦法更好地使用可用的默認分區程序。

如果您認爲應該有一個使用聚合管道計算分區的默認分區程序,那麼請向MongoDB Spark Jira project添加一張標籤。

+0

我可以使用'MongoShardedPartitioner'作爲具有哈希分片的集合嗎?文檔中說'shardkey - 該字段應該被索引幷包含唯一值.'在我的情況下,我的字段log_file_name:day_of_timestamp:hour_of_timestamp有一個合併的分區鍵,從而將相關數據保存在一起 - 至少是I希望這樣做。但預值散列值不是唯一的。文檔是否討論了哈希值?另外,我還有一個關於如何在聊天中使用MongoSpark進行多個查詢的小問題 - 如果你介意看看它。 – j9dy