我在Scala中編寫了一個相對簡單的Spark作業,它從S3中讀取一些數據,執行一些轉換和聚合,並最終將結果存儲到存儲庫中。Spark:將RDD元素拆分成塊
在最後階段,我有我的域模型的RDD,並且我想將它們分組爲大塊的元素,以便我可以在我的存儲庫中進行一些大規模插入。
我用RDDFunctions.sliding
方法來實現這一點,它工作得很好。這裏是我的代碼的簡化版本:
val processedElements: RDD[DomainModel] = _
RDDFunctions.fromRDD(processedElements)
.sliding(500, 500)
.foreach { elementsChunk =>
Await.ready(repository.bulkInsert(elementsChunk), 1.minute)
}
問題是,如果例如我有1020個元素,最終只有1000個元素在我的存儲庫中。如果窗口大小大於剩餘元素的數量,它看起來像滑動會忽略任何附加元素。
有什麼辦法可以解決這個問題嗎?如果沒有,有沒有其他方法可以在不使用RDDFunctions.sliding
的情況下實現相同的行爲?
我在開始時嘗試了類似的內容。不幸的是,這導致了內存問題,因爲我最終在內存中保留了很多東西。 – Alex