我有一些Spark代碼可以創建一系列RDD。最後,我撥打randomSplit
將其分成3組,然後我將其中的每一個寫入磁盤。所以,第一個階段是:爲什麼不Spark並行randomSplit
- 得到一些數據
- 做一些轉換
- 緩存結果
- 拆分通過
randomSplit
- 寫所有拆分磁盤
由於步(4)將事物分成3組,這裏有3個不同的Spark階段。邁向第一階段結束後,我們開始運行了階段1級的任務,但具有可執行人:
此時已經計算出的數據集的多個分區。據我瞭解,randomSplit
按分區爲基礎在分區上運行;換句話說,它不需要洗牌或收集 - 它只是在每個分區的基礎上隨機選擇行。如果這是正確的,那麼沒有理由說第2階段的一些任務不能在可用的執行程序上運行 - 它們的RDD的分區已經被計算和緩存。爲什麼Spark不啓動一些階段2的任務來利用可用的資源。
注意:顯然,「他們可以但他們沒有」在這裏回答是完全有效的。我想我真正要問的是,是否有一些技術上的原因,我沒有想到這使得這不可能(或非常困難),或者這只是執行過程中的疏忽?
下面的代碼(在科特林)的簡化版本:
fun run(sc: JavaSparkContext, opts: Options) {
val allData = fetchABunchOfData()
val allDataRdd = sc.parallelize(allData)
val taggedAndTokenized = allDataRdd.mapPartitions { addTagsAndTokens(it) }
// Convert each ResponseData to a JSON String
val jsonStrings = taggedAndTokenized.map {
val mapper = AnxJsonUtils.getMapper()
mapper.writeValueAsString(it)
}
// the randomSplit below creates 3 distinct RDD lineags so if we don't cache the parsing stuff we'll parse the
// entire document set twice.
jsonStrings.cache()
val trainValidTest =
jsonStrings.randomSplit(doubleArrayOf(opts.trainFrac, opts.validFrac, opts.testFrac), splitSeed)
trainValidTest[0].saveAsTextFile(opts.outPath + "/" + TRAIN_NAME)
trainValidTest[1].saveAsTextFile(opts.outPath + "/" + VALID_NAME)
trainValidTest[2].saveAsTextFile(opts.outPath + "/" + TEST_NAME)
}
爲了澄清,你問爲什麼第二個'saveAsTextFile'沒有開始之前,第一個完成? –