2015-10-22 52 views
3

我有一個數據集,我正在從Spark(例如Parquet文件)訪問,其中包含大量的行。我需要將這些行中的一些數據發送到外部服務,並且我需要對它們進行批處理,以便每次對外部服務的調用都包含一定數量的行(例如,每批1000行)。基本上什麼take(n)正在做什麼,但反覆,通過一個大型數據集重複執行。什麼是執行此類任務的好方法?我想這可以用foreach()完成,並手動批量聚合數據,但我不知道是否有任何內置/推薦的方式這樣做。Spark的迭代take()或批處理?

+2

我不會爲這類任務發火花。你確定你不能把這個功能移到別的地方,就像在一個普通的hadoop M/R應用程序中一樣。 – davidshen84

回答

2

當你創建一個拼花數據幀文件,它被基於分區的HDFS塊位置。

因此,第一個問題是問你是否可以將數據集並行寫入外部服務。即同時從多個服務器發送1000行批量。

如果這是好的,那麼最有效的方法是foreachPartition函數。喜歡的東西:

df.rdd.forEachPartition { it => 
    it.grouped(1000).foreach(sendBatch) 
} 

如果外部服務無法使用這種方式,則第二個最好的辦法是toLocalIterator

df.rdd.toLocalIterator { it => 
    it.grouped(1000).foreach(sendBatch) 
} 

請注意,此解決方案是顯著效率較低,因爲它會序列每個分區和從執行者轉移給司機。

+0

如何在java中實現同樣的事情? –

3

我不知道任何內置或推薦的選項,但簡單的解決方案是結合RDD API和Scala Iterable API。如果你申請的操作是冪等,你可以從工人直接做到這一點:

val batchSize: Int = ??? 
val rdd: RDD[T] = ??? 
def doSomething(xs: Seq[T]) = ??? 

rdd.foreachPartition(_.grouped(batchSize).foreach(doSomething)) 

否則,你可以在每次向驅動程序獲取單個分區:

rdd.cache 
rdd.toLocalIterator.grouped(batchSize).foreach(doSomething) 

請注意,它需要一個單獨的作業對於每個分區,因此先緩存輸入rdd以避免重新計算是一個好主意。

在Python中,你可以使用toolz庫作爲替換Iterator API的:

from toolz.itertoolz import partition_all 

rdd.foreachPartition(
    lambda iter: [doSomething(xs) for xs in partition_all(batchSize, iter)]) 

for xs in partition_all(batchSize, rdd.toLocalIterator()): 
    doSomething(xs) 
+0

如果RDD只是由實木複合地板文件支持的DataFrame,我認爲緩存不會有幫助。事實上,如果文件很大,它很可能會減慢速度或者根本不工作。 – kostya