2017-08-28 31 views
0

在我的用例中,我有一個包含10萬條記錄的配置單元表。每個記錄代表一個必須處理的原始數據文件。處理每個原始數據文件會生成一個csv文件,其大小將在10MB到500MB之間變化。最終,這些CSV文件將作爲單獨的進程填充到HIVE表中。在我的企業集羣中,仍然不建議在hdfs中生成大量的數據。因此,我更願意將這兩個單獨的流程合併爲一個流程,以便他們處理5000條記錄的5000條記錄。處理火花驅動程序中的Hive記錄

我的問題: -

鑑於我RDD指的是整個蜂巢表,我怎麼執行的原始數據處理步驟,每5000條記錄? (類似於for循環,每次增加5000個記錄)

回答

1

一種方法是使用RDD的滑動功能。你可以在apache spark的mllib包中找到它。這裏是你如何使用它。 假設我們有1000個元素

val rdd = sc.parallelize(1 to 1000) 
import org.apache.spark.mllib.rdd._ 
val newRdd = RDDFunctions.fromRDD(rdd) 

// sliding by 10 (instead use 5000 or what you need) 
val rddSlidedBy10 = newRdd.sliding(10, 10) 

的RDD結果會是這樣

Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33, 34, 35, 36, 37, 38, 39, 40), Array(41, 42, 43, 44, 45, 46, 47, 48, 49, 50), Array(51, 52, 53, 54, 55, 56, 57, 58, 59, 60), Array(61, 62, 63, 64, 65, 66, 67, 68, 69, 70), Array(71, 72, 73, 74, 75, 76, 77, 78, 79, 80) 

的你可以在陣列和處理原始數據的foreach到CSV

+0

感謝。看來這個方法在spark 2.xx中不幸的是,對於這個用例,我需要使用spark 1.6.2。 – Bala

+0

也有替代方法:https://stackoverflow.com/questions/43877678/spark-split-rdd-elements-into-chunks – dumitru

相關問題