我有一個數據集,我正在從Spark(例如Parquet文件)訪問,其中包含大量的行。我需要將這些行中的一些數據發送到外部服務,並且我需要對它們進行批處理,以便每次對外部服務的調用都包含一定數量的行(例如,每批1000行)。基本上什麼take(n)
正在做什麼,但反覆,通過一個大型數據集重複執行。什麼是執行此類任務的好方法?我想這可以用foreach()
完成,並手動批量聚合數據,但我不知道是否有任何內置/推薦的方式這樣做。Spark的迭代take()或批處理?
回答
當你創建一個拼花數據幀文件,它被基於分區的HDFS塊位置。
因此,第一個問題是問你是否可以將數據集並行寫入外部服務。即同時從多個服務器發送1000行批量。
如果這是好的,那麼最有效的方法是foreachPartition
函數。喜歡的東西:
df.rdd.forEachPartition { it =>
it.grouped(1000).foreach(sendBatch)
}
如果外部服務無法使用這種方式,則第二個最好的辦法是toLocalIterator
:
df.rdd.toLocalIterator { it =>
it.grouped(1000).foreach(sendBatch)
}
請注意,此解決方案是顯著效率較低,因爲它會序列每個分區和從執行者轉移給司機。
如何在java中實現同樣的事情? –
我不知道任何內置或推薦的選項,但簡單的解決方案是結合RDD API和Scala Iterable
API。如果你申請的操作是冪等,你可以從工人直接做到這一點:
val batchSize: Int = ???
val rdd: RDD[T] = ???
def doSomething(xs: Seq[T]) = ???
rdd.foreachPartition(_.grouped(batchSize).foreach(doSomething))
否則,你可以在每次向驅動程序獲取單個分區:
rdd.cache
rdd.toLocalIterator.grouped(batchSize).foreach(doSomething)
請注意,它需要一個單獨的作業對於每個分區,因此先緩存輸入rdd以避免重新計算是一個好主意。
在Python中,你可以使用toolz
庫作爲替換Iterator
API的:
from toolz.itertoolz import partition_all
rdd.foreachPartition(
lambda iter: [doSomething(xs) for xs in partition_all(batchSize, iter)])
或
for xs in partition_all(batchSize, rdd.toLocalIterator()):
doSomething(xs)
如果RDD只是由實木複合地板文件支持的DataFrame,我認爲緩存不會有幫助。事實上,如果文件很大,它很可能會減慢速度或者根本不工作。 – kostya
- 1. Spark Spark待處理批處理
- 2. 迭代批處理文件夾數組
- 3. 迭代目錄與批處理
- 4. Facebook Graph API批處理 - 迭代分頁
- 5. 使用cURL和批處理線迭代
- 6. DOS批處理命令文件 - 迭代處理
- 7. Spark - Take並減去
- 8. Skip/Take with Spark SQL
- 9. Spark JavaPairRDD迭代
- 10. PHP:mongoDB - 迭代處理
- 11. 無縫處理迭代和非迭代
- 12. 用於kafka主題後處理的spark-streaming批處理間隔
- 13. Dataflow中的迭代處理
- 14. MSBuild批處理迭代器在同一迭代中有所不同
- 15. 批處理文件擺脫輸出中的迭代器編號
- 16. 批處理函數參數和函數內部的迭代
- 17. Spark Streaming - 批處理間隔與處理時間
- 18. MapReduce或批處理作業?
- 19. PowerShell或批處理EXITCODE 7zip
- 20. SVCUtil或WSDL批處理
- 21. 迭代和處理ArrayList
- 22. Windows批處理腳本 - 文件迭代問題
- 23. Windows批處理:同時迭代多個文件行
- 24. 在迭代文本文件上運行批處理命令
- 25. 在批處理文件中對數組進行迭代
- 26. 迭代文件夾名稱批處理文件
- 27. 如何複製文件。迭代空白目錄 - 批處理
- 28. 批處理文件讀取文件內容和迭代
- 29. 使用批處理文件對PATH進行迭代
- 30. 在Spring批處理中創建迭代流程步驟
我不會爲這類任務發火花。你確定你不能把這個功能移到別的地方,就像在一個普通的hadoop M/R應用程序中一樣。 – davidshen84