2017-05-18 40 views
1

我想在不調用.collect()的情況下針對RDD的每個元素對驅動程序執行操作。第一個想法是使用RDD.toLocalIterator()RDD.toLocalIterator急切評價

val config = new SparkConf().setMaster("local[10]").setAppName("xxx") 
val sc: SparkContext = new SparkContext(config) 
val ints: RDD[Int] = sc.parallelize(1 to 50) 
val doubled = ints.map(i => { 
    Thread.sleep(200) 
    println(s"map $i" + Thread.currentThread()) 
    i * 2 
}) 

doubled.toLocalIterator.foreach(i => { 
    println(s"got $i" + Thread.currentThread()) 
}) 

但在接下來的分區的這種情況下,計算僅耗時以前的分區後開始。所以整體計算需要很多時間。 我發明了下面的技巧:

doubled.cache() 
//force rdd to be materialized 
println(doubled.count()) 
//traverse cached rdd 
doubled.toLocalIterator.foreach(i => { 
    println(s"got $i" + Thread.currentThread()) 
}) 

有沒有更好的解決辦法?

回答

-1

爲什麼你不只是使用RDD.foreach方法我認爲這可以執行相同的問題給你的例子。使用這個你可以賺取利潤的並行處理提供火花RDD s

+0

'foreach'運行在工人,我需要運行驅動程序上的函數 – simpadjo

+0

如果你運行驅動程序的功能,我認爲你不能從中受益火花提供的並行處理 –