2016-07-11 21 views
0

我正在將一些map-reduce代碼遷移到Spark中,並且在構造函數中返回Iterable時出現問題。 在MR代碼,我有一個減少功能,通過鍵進行分組,然後(使用multipleOutputs)將迭代的值,並使用寫(在多個輸出,但這是不重要的),以像這樣的代碼(簡化):Spark flatMap/reduce:如何縮放和避免OutOfMemory?

reduce(Key key, Iterable<Text> values) { 
    // ... some code 
    for (Text xml: values) { 
     multipleOutputs.write(key, val, directory); 
    } 
} 

但是,在Spark中,我翻譯了一個地圖,並將其縮小爲以下順序: mapToPair - > groupByKey - > flatMap 推薦...在某些書中。

mapToPair基本上通過函數映射添加一個鍵,該映射基於記錄上的某些值爲該記錄創建一個鍵。有時候一個關鍵字可能有很高的基數。

JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() { 
    public Tuple2<Key, String> call(String value) { 
     //... 
     return functionMap.call(value); 
    } 
}); 

rddPaired施加RDD.groupByKey()來獲取RDD喂flatMap功能:

​​

一旦分組,一flatMap呼叫做減少。在這裏,操作是一個轉型:

public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) { 
    // some code... 
    List<String> out = new ArrayList<String>(); 
    if (someConditionOnKey) { 
     // do a logic 
     Grouper grouper = new Grouper(); 
     for (String xml : keyValue._2()) { 
      // group in a separate class 
      grouper.add(xml); 
     } 
     // operation is now performed on the whole group 
     out.add(operation(grouper)); 
    } else { 
     for (String xml : keyValue._2()) { 
      out.add(operation(xml)); 
     } 
     return out; 
    } 
} 

它正常工作......有沒有太多記錄鍵。實際上,當一個有很多值的密鑰進入reduce的「else」時,它會被OutOfMemory打破。

注意:我已經包含了「if」部分來解釋我想要生成的邏輯,但是當進入「else」時發生故障......因爲當數據進入「else」時,通常意味着會出現根據數據的性質,可以有更多的價值。

很明顯,必須將所有分組的值保留在「out」列表中,如果密鑰具有數百萬條記錄,它將不會縮放,因爲它會將它們保留在內存中。我已經達到了OOM發生的地步(是的,當執行上面要求存儲的「操作」時 - 並沒有給出),但這不是一個非常昂貴的存儲操作)。

有什麼辦法可以避免這種情況的規模?要麼通過使用其他指令複製行爲以更具可擴展性的方式達到相同的輸出,要麼能夠將Spark的值合併(就像我以前用MR所做的那樣)...

+2

您應該發佈完整的火花代碼。 'GroupByKey'也可以成爲內存瓶頸。 – Dikei

+0

謝謝,更新的描述。但是,我可以看到問題出現在flatMap代碼中,因爲: a)如果工作流剛剛達到groupByKey並且不減少,則錯誤不會出現 - 儘管我認爲它們可以合併。 b)必須在返回它們之前將值保存在內存中,這顯然不像可擴展的。在任何情況下,groupByKey都可能會受到影響,但對我來說似乎很清楚,我提到的也是一個問題。 – xmar

回答

1

效率低在flatMap操作中做條件。您應該檢查外部條件以創建兩個不同的RDD並分開處理它們。

rddPaired.cache(); 

// groupFilterFunc will filter which items need grouping 
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.filter(groupFilterFunc).groupByKey(); 
// processGroupedValuesFunction should call `operation` on group of all values with the same key and return the result 
rddGrouped.mapValues(processGroupedValuesFunction); 

// nogroupFilterFunc will filter which items don't need grouping 
JavaPairRDD<Key, Iterable<String>> rddNoGrouped = rddPaired.filter(nogroupFilterFunc); 
// processNoGroupedValuesFunction2 should call `operation` on a single value and return the result 
rddNoGrouped.mapValues(processNoGroupedValuesFunction2); 
+0

是的,我想這是一種可能性,你很緊張。畢竟,原始的MR代碼可能是錯誤的部分! 但是,即使我這樣做了,那麼分組部分將依賴於沒有太多值的密鑰。所以這個問題將成立:是否有一種方法可以以一種有效的內存方式執行該操作(即不必將所有值保存在內存中)? – xmar

+1

這實際上取決於你想要做什麼。如果您的原始MR代碼可以使用組合器進行部分聚合,那麼您可以嘗試在Spark中使用'reduceByKey'或'aggregateByKey'以獲得相似效果。部分聚合在正確使用時會大量減少內存使用量。 – Dikei