2017-05-09 148 views
0

我在Scala中編寫了一個相對簡單的Spark作業,它從S3中讀取一些數據,執行一些轉換和聚合,並最終將結果存儲到存儲庫中。Spark:將RDD元素拆分成塊

在最後階段,我有我的域模型的RDD,並且我想將它們分組爲大塊的元素,以便我可以在我的存儲庫中進行一些大規模插入。

我用RDDFunctions.sliding方法來實現這一點,它工作得很好。這裏是我的代碼的簡化版本:

val processedElements: RDD[DomainModel] = _ 
RDDFunctions.fromRDD(processedElements) 
    .sliding(500, 500) 
    .foreach { elementsChunk => 
     Await.ready(repository.bulkInsert(elementsChunk), 1.minute) 
    } 

問題是,如果例如我有1020個元素,最終只有1000個元素在我的存儲庫中。如果窗口大小大於剩餘元素的數量,它看起來像滑動會忽略任何附加元素。

有什麼辦法可以解決這個問題嗎?如果沒有,有沒有其他方法可以在不使用RDDFunctions.sliding的情況下實現相同的行爲?

回答

0

難道你不能只使用foreachPartition和手動批次管理?

fromRDD.foreachPartition(items: Iterator[DomainModel] => { 
    val batch = new ArrayBuffer[DomainModel](BATCH_SIZE) 
    while (items.hasNext) { 
    if (batch.size >= BATCH_SIZE) { 
     bulkInsert(batch) 
     batch.clear() 
    } 
    batch += items.next 
    } 
    if (!batch.isEmpty) { 
     bulkInsert(batch) 
    } 
}) 
+0

我在開始時嘗試了類似的內容。不幸的是,這導致了內存問題,因爲我最終在內存中保留了很多東西。 – Alex

0

你說得對,星火的sliding(不像Scala的),會如果窗口大小超過其餘項目的數量,根據RDDFunctions doc產生一個空RDD。 Spark也沒有與Scala的grouped等價。

如果您知道要創建多少個羣組,則可能適用的解決方法是將RDD與modulo過濾器分開。下面是將RDD分成5組的簡單示例:

val rdd = sc.parallelize(Seq(
    (0, "text0"), (1, "text1"), (2, "text2"), (3, "text2"), (4, "text2"), (5, "text5"), 
    (6, "text6"), (7, "text7"), (8, "text8"), (9, "text9"), (10, "text10"), (11, "text11") 
)) 

def g(n:Int)(x: Int): Boolean = { x % 5 == n } 

val rddList = (0 to 4).map(n => rdd.filter(x => g(n)(x._1))) 

(0 to 4).foreach(n => rddList(n).collect.foreach(println)) 

(0,text0) 
(5,text5) 
(10,text10) 

(1,text1) 
(6,text6) 
(11,text11) 

(2,text2) 
(7,text7) 

(3,text2) 
(8,text8) 

(4,text2) 
(9,text9) 
+0

不幸的是我不知道會有多少組。投入在不斷變化,所以每次我們可能會得到不同數量的團體。 – Alex