2017-02-16 34 views
-1

我正在運行一個Spark Scala程序,用於在輸入文件中執行文本掃描。我試圖通過使用rdd.mappartition來實現並行性。在mappartition部分內,我正在執行一些檢查並調用map函數來實現每個分區的並行執行。在map函數中,我正在調用一個自定義方法,在那裏執行掃描並將結果發送回去。Spark Map partiton不能在紗線簇模式下工作

現在,當我使用--master local [*]提交代碼時,代碼工作正常,但當我使用--master yarn-cluster提交代碼時,代碼無法正常工作。它的工作沒有任何錯誤,但是調用並沒有進入mappartition本身。我通過放置少量println語句來驗證這一點。

請幫我提一下你的建議。 下面是示例代碼:

def main(args: Array[String]) { 

    val inputRdd = sc.textFile(inputFile,2) 
    val resultRdd = inputRdd.mapPartitions{ iter => 

    println("Inside scanning method..") 
    var scanEngine = ScanEngine.getInstance(); 
    ... 
    .... 
    .... 
    var mapresult = iter.map { y => 
     line = y 
     val last = line.lastIndexOf("|"); 
     message = line.substring(last + 1, line.length()); 
     getResponse(message) 
    } 
    } 

    val finalRdd = sc.parallelize(resultRdd.map(x => x.trim())) 
    finalRdd.coalesce(1, true).saveAsTextFile(hdfsOutpath) 

} 

def getResponse(input: String): String = { 
    var result = ""; 
    val rList = new ListBuffer[String](); 

    try { 
     //logic here 
    } 
    return result; 
} 
+0

什麼不起作用?你有堆棧跟蹤嗎? –

+0

它正在工作,但在mappartition內寫入的邏輯沒有得到執行,當我在--master紗羣集模式下運行 –

+1

這條線很奇怪 –

回答

1

如果它的工作是看到內掃描方法的證據..打印出來,也不會當,因爲該代碼被執行羣集上運行出現工人,而不是司機。

你將不得不在法庭細節中仔細閱讀代碼,以開放的心態去嘗試找出工作沒有輸出的原因。通常,當作業在本地模式下工作時,而不是在羣集上時,這是由於代碼在其中執行的位置或記錄輸出的位置的微妙之處。

有太多限制的代碼來提供更具體的答案。

+0

編輯解決實際問題! – ImDarrenG

+0

我正在從mappartition中調用的方法返回一個迭代器....我沒有在這裏發佈完整的程序。此外,我使用一個可變列表集合來保存多個返回的結果,並最終使用List.mkString(「\ n」)方法將其轉換爲字符串,然後將其返回...我懷疑Mutable集合的用法是問題....你對此有何看法? –

+0

@DILIPKUMAR可變集合的使用不應該導致問題。 'getResponse'中的'try'語句防止了異常被報告? – ImDarrenG

0

Spark使用map函數以及mapPartitions實現了並行性。分區的數量決定了並行的數量,但無論您是否使用mapPartitions函數,每個分區都將獨立執行。

只有幾個理由使用mapPartitions而不是map;例如功能的初始化成本很高,但可以多次調用它,例如在文本上執行一些NLP任務

相關問題