我正在運行一個Spark Scala程序,用於在輸入文件中執行文本掃描。我試圖通過使用rdd.mappartition來實現並行性。在mappartition部分內,我正在執行一些檢查並調用map函數來實現每個分區的並行執行。在map函數中,我正在調用一個自定義方法,在那裏執行掃描並將結果發送回去。Spark Map partiton不能在紗線簇模式下工作
現在,當我使用--master local [*]提交代碼時,代碼工作正常,但當我使用--master yarn-cluster提交代碼時,代碼無法正常工作。它的工作沒有任何錯誤,但是調用並沒有進入mappartition本身。我通過放置少量println語句來驗證這一點。
請幫我提一下你的建議。 下面是示例代碼:
def main(args: Array[String]) {
val inputRdd = sc.textFile(inputFile,2)
val resultRdd = inputRdd.mapPartitions{ iter =>
println("Inside scanning method..")
var scanEngine = ScanEngine.getInstance();
...
....
....
var mapresult = iter.map { y =>
line = y
val last = line.lastIndexOf("|");
message = line.substring(last + 1, line.length());
getResponse(message)
}
}
val finalRdd = sc.parallelize(resultRdd.map(x => x.trim()))
finalRdd.coalesce(1, true).saveAsTextFile(hdfsOutpath)
}
def getResponse(input: String): String = {
var result = "";
val rList = new ListBuffer[String]();
try {
//logic here
}
return result;
}
什麼不起作用?你有堆棧跟蹤嗎? –
它正在工作,但在mappartition內寫入的邏輯沒有得到執行,當我在--master紗羣集模式下運行 –
這條線很奇怪 –