2017-02-10 27 views
2

有兩種方法可以在兩個併發的Spark作業中使用相同的RDD嗎?如何在併發Spark作業中共享RDD

例如,在下面的應用程序中,我試圖將b寫入磁盤(作業1),同時計算f(作業2)。但是,Spark似乎一次只能執行一項工作。

val conf = new SparkConf() 
val sc = new SparkContext(conf) 

val a = sc.parallelize(0 until 1000) 
val b = a.mapPartitions(it => { Thread.sleep(5000); it }) 

// Compute b 
b.persist().foreachPartition(_ => {}) 

val c = b.mapPartitions(it => { Thread.sleep(5000); it }) 
val d = c.mapPartitions(it => { Thread.sleep(5000); it }) 
val e = d.mapPartitions(it => { Thread.sleep(5000); it }) 
val f = e.mapPartitions(it => { Thread.sleep(5000); it }) 

// Concurrent actions on b and f (f uses b) 
val actionFuts = List(
    // Job 1 
    Future { 
     Thread.sleep(1000) 
     b.saveAsTextFile("output.ignore/test/b.txt") 
    }, 
    // Job 2 
    Future { 
     f.saveAsTextFile("output.ignore/test/f.txt") 
    } 
) 

Await.result(Future.sequence(actionFuts).map(_ =>()), Duration.Inf) 

回答

1

已解決。我正在啓動4個工作人員的spark-submit,而rdd b有4個分區。因此,所有工作人員都很忙,Spark一次只能執行一項工作(每個工作人員有四個分區)。

爲了同時執行這兩個作業,我必須使用更多的工人或更少的分區。