我正在將一些map-reduce代碼遷移到Spark中,並且在構造函數中返回Iterable時出現問題。 在MR代碼,我有一個減少功能,通過鍵進行分組,然後(使用multipleOutputs)將迭代的值,並使用寫(在多個輸出,但這是不重要的),以像這樣的代碼(簡化):Spark flatMap/reduce:如何縮放和避免OutOfMemory?
reduce(Key key, Iterable<Text> values) {
// ... some code
for (Text xml: values) {
multipleOutputs.write(key, val, directory);
}
}
但是,在Spark中,我翻譯了一個地圖,並將其縮小爲以下順序: mapToPair - > groupByKey - > flatMap 推薦...在某些書中。
mapToPair基本上通過函數映射添加一個鍵,該映射基於記錄上的某些值爲該記錄創建一個鍵。有時候一個關鍵字可能有很高的基數。
JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() {
public Tuple2<Key, String> call(String value) {
//...
return functionMap.call(value);
}
});
的rddPaired施加RDD.groupByKey()來獲取RDD喂flatMap功能:
一旦分組,一flatMap呼叫做減少。在這裏,操作是一個轉型:
public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
// some code...
List<String> out = new ArrayList<String>();
if (someConditionOnKey) {
// do a logic
Grouper grouper = new Grouper();
for (String xml : keyValue._2()) {
// group in a separate class
grouper.add(xml);
}
// operation is now performed on the whole group
out.add(operation(grouper));
} else {
for (String xml : keyValue._2()) {
out.add(operation(xml));
}
return out;
}
}
它正常工作......有沒有太多記錄鍵。實際上,當一個有很多值的密鑰進入reduce的「else」時,它會被OutOfMemory打破。
注意:我已經包含了「if」部分來解釋我想要生成的邏輯,但是當進入「else」時發生故障......因爲當數據進入「else」時,通常意味着會出現根據數據的性質,可以有更多的價值。
很明顯,必須將所有分組的值保留在「out」列表中,如果密鑰具有數百萬條記錄,它將不會縮放,因爲它會將它們保留在內存中。我已經達到了OOM發生的地步(是的,當執行上面要求存儲的「操作」時 - 並沒有給出),但這不是一個非常昂貴的存儲操作)。
有什麼辦法可以避免這種情況的規模?要麼通過使用其他指令複製行爲以更具可擴展性的方式達到相同的輸出,要麼能夠將Spark的值合併(就像我以前用MR所做的那樣)...
您應該發佈完整的火花代碼。 'GroupByKey'也可以成爲內存瓶頸。 – Dikei
謝謝,更新的描述。但是,我可以看到問題出現在flatMap代碼中,因爲: a)如果工作流剛剛達到groupByKey並且不減少,則錯誤不會出現 - 儘管我認爲它們可以合併。 b)必須在返回它們之前將值保存在內存中,這顯然不像可擴展的。在任何情況下,groupByKey都可能會受到影響,但對我來說似乎很清楚,我提到的也是一個問題。 – xmar