2017-02-05 34 views
1

我有一些Spark代碼可以創建一系列RDD。最後,我撥打randomSplit將其分成3組,然後我將其中的每一個寫入磁盤。所以,第一個階段是:爲什麼不Spark並行randomSplit

  1. 得到一些數據
  2. 做一些轉換
  3. 緩存結果
  4. 拆分通過randomSplit
  5. 寫所有拆分磁盤

由於步(4)將事物分成3組,這裏有3個不同的Spark階段。邁向第一階段結束後,我們開始運行了階段1級的任務,但具有可執行人:

enter image description here

此時已經計算出的數據集的多個分區。據我瞭解,randomSplit按分區爲基礎在分區上運行;換句話說,它不需要洗牌或收集 - 它只是在每個分區的基礎上隨機選擇行。如果這是正確的,那麼沒有理由說第2階段的一些任務不能在可用的執行程序上運行 - 它們的RDD的分區已經被計算和緩存。爲什麼Spark不啓動一些階段2的任務來利用可用的資源。

注意:顯然,「他們可以但他們沒有」在這裏回答是完全有效的。我想我真正要問的是,是否有一些技術上的原因,我沒有想到這使得這不可能(或非常困難),或者這只是執行過程中的疏忽?

下面的代碼(在科特林)的簡化版本:

fun run(sc: JavaSparkContext, opts: Options) { 
    val allData = fetchABunchOfData() 

    val allDataRdd = sc.parallelize(allData) 

    val taggedAndTokenized = allDataRdd.mapPartitions { addTagsAndTokens(it) } 

    // Convert each ResponseData to a JSON String 
    val jsonStrings = taggedAndTokenized.map { 
     val mapper = AnxJsonUtils.getMapper() 
     mapper.writeValueAsString(it) 
    } 

    // the randomSplit below creates 3 distinct RDD lineags so if we don't cache the parsing stuff we'll parse the 
    // entire document set twice. 
    jsonStrings.cache() 
    val trainValidTest = 
      jsonStrings.randomSplit(doubleArrayOf(opts.trainFrac, opts.validFrac, opts.testFrac), splitSeed) 
    trainValidTest[0].saveAsTextFile(opts.outPath + "/" + TRAIN_NAME) 
    trainValidTest[1].saveAsTextFile(opts.outPath + "/" + VALID_NAME) 
    trainValidTest[2].saveAsTextFile(opts.outPath + "/" + TEST_NAME) 
} 
+0

爲了澄清,你問爲什麼第二個'saveAsTextFile'沒有開始之前,第一個完成? –

回答

1

出於多種原因,saveAsTextFile是一個阻塞調用。這意味着Spark主機在第一個完成之前不會收到第二個保存指令。

也就是說,如果您希望利用這些可用資源,您可以做的是在三個獨立的線程中撥打saveAsTextFile並等待他們的未來。一旦工作人員完成第一個任務的分區,就可以從第二個任務開始。

相關問題