2016-12-05 61 views
0

您好我試圖插入元件RDD陣列[字符串]使用階火花。如何插入元件在火花RDD陣列

這裏是一個例子。

val data = RDD[Array[String]] = Array(Array(1,2,3), Array(1,2,3,4), Array(1,2)). 

我想使這個數據中所有數組的長度爲4。

如果數組的長度小於4,I要填充的陣列中的NULL值。

這裏是我的代碼,我試圖解決的問題。

val newData = data.map(x => 
    if(x.length < 4){ 
     for(i <- x.length until 4){ 
     x.union("NULL") 
     } 
    } 
    else{ 
     x 
    } 
) 

但結果是Array[Any] = Array((), Array(1, 2, 3, 4),())

所以,我想其他辦法。我在for循環中使用了yield

val newData = data.map(x => 
    if(x.length < 4){ 
     for(i <- x.length until 4)yield{ 
     x.union("NULL") 
     } 
    } 
    else{ 
     x 
    } 
) 

結果是Array[Object] = Array(Vector(Array(1, 2, 3, N, U, L, L)), Array(1, 2, 3, 4), Vector(Array(1, 2, N, U, L, L), Array(1, 2, N, U, L, L)))

這些都不是我想要的。我想回到這樣

RDD[Array[String]] = Array(Array(1,2,3,NULL), Array(1,2,3,4), Array(1,2,NULL,NULL)).

我應該怎麼辦? 有沒有辦法解決它?

回答

2

union是一個功能性操作,它不改變所述陣列x。不過,您不需要用循環來完成此操作,而且任何循環實現都可能會更慢 - 使用所有NULL值創建一個新集合要好得多,而不是每次添加空值時都要進行變異。下面是應爲你工作lambda函數:

def fillNull(x: Array[Int], desiredLength: Int): Array[String] = { 
    x.map(_.toString) ++ Array.fill(desiredLength - x.length)("NULL") 
} 

val newData = data.map(fillNull(_, 4)) 
+0

我很欣賞您的意見! 非常感謝! 我會試試! –

1

我用下面的代碼解決了你的使用情況:

val initialRDD = sparkContext.parallelize(Array(Array[AnyVal](1, 2, 3), Array[AnyVal](1, 2, 3, 4), Array[AnyVal](1, 2, 3))) 
val transformedRDD = initialRDD.map(array => 
    if (array.length < 4) { 
    val transformedArray = Array.fill[AnyVal](4)("NULL") 
    Array.copy(array, 0, transformedArray, 0, array.length) 
    transformedArray 
    } else { 
    array 
    } 
) 
val result = transformedRDD.collect() 
+0

這個硬編碼的'4'讓我覺得我可能使用了'array.length'的廣播帽。 –

+0

非常感謝! 這很有用! –