2016-12-15 47 views
2

我有一個相對簡單的問題。Spark分割RDD分塊和連接

我有一個大的Spark RDD [String](包含JSON)。在我的用例中,我想將N個字符串分組(連接)爲一個新的RDD [String],以便它的大小爲oldRDD.size/N。

僞例如:

val oldRDD : RDD[String] = ['{"id": 1}', '{"id": 2}', '{"id": 3}', '{"id": 4}'] 

val newRDD : RDD[String] = someTransformation(oldRDD, ",", 2) 
newRDD = ['{"id": 1},{"id": 2}','{"id": 3},{"id": 4}'] 

val anotherRDD : RDD[String] = someTransformation(oldRDD, ",", 3) 
anotherRDD = ['{"id": 1},{"id": 2},{"id": 3}','{"id": 4}'] 

我已經找了一個類似的案件,但無法找到任何東西。

謝謝!

回答

2

在這裏你必須使用zipWithIndex函數,然後計算組。

例如,index = 3和n(組數)= 2給出第二組。 3/2 = 1(整數除法),所以基於0的第二組

val n = 3; 
val newRDD1 = oldRDD.zipWithIndex() // creates tuples (element, index) 
    // map to tuple (group, content) 
    .map(x => (x._2/n, x._1)) 
    // merge 
    .reduceByKey(_ + ", " + _) 
    // remove key 
    .map(x => x._2) 

一個注: 「zipWithIndex」 的順序是內部訂單。它在業務邏輯中可能沒有意義,您必須檢查您的情況下訂單是否正常。如果不是,請對RDD進行分類,然後使用zipWithIndex

+0

這是一個很好的答案!但是在這種情況下'n'不會是組數;相反,它是組的*大小*。如果你希望'n'是組的*號*,你需要使用模運算符而不是除法,並且注意元素的排序不會被保留。 – vaerek