2015-08-20 15 views
0
val counts = parsed.mapPartitions(iter => { 
    iter.flatMap(point => { 
    println("points"+point) 
    point.indices.map(i => i,point(i))) 
    }) 
}).countByValue() 
val count = parsed.mapPartitions(iter => { 
    iter.flatMap(point => { 
    println("pointsssss" + point.deep) 
    point.indices.map(i => (i, point(i))) 
    }) 
}).countByValue() 

當我執行count.foreach(println)時,我也從counts得到輸出。我怎樣才能避免這個問題?我們如何避免MapPartition相關的問題?

+1

這不完全清楚你的問題是什麼,也許你可以編輯你的問題? –

+2

無關:您可以使用'point.zipWithIndex.map(_。swap)'而不是'point.indices.map(i =>(i,point(i)))''。 「 –

+0

」也從計數中脫身。「目前還不清楚你的意思。請說明 –

回答

0

您看到兩個打印語句的原因是countByValue本身就是一個操作而不是一個轉換,它會觸發RDD的評估(在這種情況下,它們都是)。從該文檔:

DEF countByValue():地圖[T,龍]

返回作爲地圖上的本RDD每個唯一值的計數(值計數)對。最終的組合步驟在主服務器上本地進行,相當於運行單個減少任務。

您的下一個代碼count.foreach(println)因此發生在Spark的外部,在正常Scala集合中,在主節點中。

檢查,如果這是不是你想要的行爲邏輯,我有懷疑,你想countByKey(),而不是(也動作)。