2015-04-23 18 views
2

如果我有("a","b","c")Spark如何按任意數量的鍵組合?

的RDD和密鑰生成器是一樣的東西

def keygen(x:String) = x match { 
    case "a" => Seq("x","y") 
    case "b" => Seq("x") 
    case "c" => Seq() 
} 

如何獲得的("x"->Seq("a","b"),"y"->Seq("b"))

我的方式的鍵值RDD做到這一點。

val sample = sc.parallelize(Seq("a", "b", "c")) 

def keygen(x: String) = x match { 
    case "a" => Seq("x", "y") 
    case "b" => Seq("x") 
    case "c" => Seq() 
} 
val sampleWithKey = sample.flatMap(x => keygen(x).map(y => (y, x))).groupBy(_._1).mapValues(_.map(_._2)) 
val result = sampleWithKey.collect() 
println("result: ", result.mkString("(", ",", ")")) 

得到(x,List(a, b)),(y,List(a))

+0

嗯......你'keygen'產生'SEQ [String]'所以我不認爲你想要一個'RDD',其中'keys'是'String'。另外...你的'RDD'應該是'TypeSafe',所以你不能有一個不一致的類型的'RDD',比如'(「x」 - >(「a」,「b」),「y」 - >(「b」))'''[(String,(String,String)),(String,(String))]' –

+0

@SarveshKumarSingh編輯過的問題。 – Renkai

回答

0

嗯......它看起來像一個奇怪的事情,但你可以做到這一點如下,

def keygen(x:String) = x match { 
    case "a" => Seq("x","y") 
    case "b" => Seq("x") 
    case "c" => Seq("Empty") 
} 


val stringRdd = s.parallelize(List("a", "b", "c")) 
// RDD[ "a", "b", "c" ] 

val keyedRdd = stringRdd.map(string => (keygen(string), string)) 
// RDD[ (Seq("x", "y"), a), (Seq("x"), "b"), (Seq("Empty"), "c") ] 

val keyFlatRdd = keyedRdd 
    .flatMap({ case (keySeq, string) => keySeq.map(key => (key, string)) }) 
    .filter({ case (key, string) => !key.equalsIgnoreCase("Empty") }) 
// RDD[ ("x", "a"), ("y", "a"), ("x", "b") ] 

val finalRdd = keyFlatRdd 
    .groupBy({ case(key, string) => key } 
    .map({ case (key, seq) => (key, seq.map(_._2)) }) 
// RDD[ ("x", Seq("a", "b")), ("y", Seq("a")) ]