2016-01-20 104 views
1
val data: RDD [(String, Array[Int])] = sc.parallelize(Seq(
    ("100",Array(1, 2, 3, 4, 5)), ("1000",Array(10, 11, 12, 13, 14)) 
)) 

val codes = sc.parallelize(Seq(2, 3, 12, 13)) 

val result = data.map {case (id,values) => (id, values.diff(codes))} 

我想獲得的結果爲:星火RDD:設置差異

val result: Array[(String, Array[Int])] = Array(
    ("100", Array(1, 4, 5)), ("1000", Array(10, 11, 14)) 
) 

然而,當我做差集,我得到的類型不匹配的錯誤。

+0

你的代碼是失去了一些東西......是什麼'dummy_data_sorted' ? –

+0

很抱歉更新了代碼。 – user3803714

回答

1

如果你想在本地數據結構上應用操作,沒有理由並行化codes。只是mapValues這樣的:

val codes = Seq(2, 3, 12, 13) 
val result = data.mapValues(_.diff(codes)) 

如果代碼無法裝入內存,你必須做一些稍微複雜一些:

// Add dummy values to codes 
val codes = sc.parallelize(Seq(2, 3, 12, 13)).map((_, null)) 

data 
    .flatMapValues(x => x) // Flatten values (k, vs) => (k, v) 
    .map(_.swap) // Swap order => (v, k) 
    .subtractByKey(codes) // Difference 
    .map(_.swap) // Swap order => (k, v) 
    .groupByKey // Group => (k, vs) 
+0

我也會在第一個例子中使'codes'成爲一個廣播變量。我認爲你的第二個例子並不比'groupByKey','join'等更高效,這在使用時看起來更直觀。 –

+0

@RaduIonescu如果「代碼」相對較大,那麼確定。如果它在這個例子中很小,那麼不值得所有的大驚小怪的恕我直言。關於你的第二個評論。你能提供一些例子嗎? – zero323

+0

'data.flatMapValues(x => x).map(_。swap).leftOuterJoin(codes).map(...)'或'data.flatMapValues(x => x).map(_。swap)。 groupByKey()。濾波器(...)。圖()'。 groupByKey看起來可能會輸出更多的元組,但由於DAG調度程序,過濾器和映射操作將在元組基上執行,並最終獲得相同的輸出。 –