2015-09-04 39 views
0

我有一個文本變量,它是一個String RDD在階文檔計數/斯卡拉

val data = sc.parallelize(List("i am a good boy.Are you a good boy.","You are also working here.","I am posting here today.You are good.")) 

我在Scala的地圖另一個變量(以下給出)

//列表對於該文檔計數需要被發現的話,最初的文檔數爲1

val dictionary = Map("""good""" -> 1,"""working""" -> 1,"""posting""" -> 1). 

我想要做的每一個字典術語的文件數量和獲得的輸出以鍵值格式

我的輸出應該與以下數據類似。

(good,2) 

(working,1) 

(posting,1) 

我曾嘗試是

dictionary.map { case(k,v) => k -> k.r.findFirstIn(data.map(line => line.trim()).collect().mkString(",")).size} 

我越來越算作1的所有的話。

請幫我解決上面的問題

在此先感謝。

回答

1

爲什麼不使用flatMap來創建字典,然後你就可以查詢它。

val dictionary = data.flatMap {case line => line.split(" ")}.map {case word => (word, 1)}.reduceByKey(_+_) 

如果我收集這個在REPL我得到以下結果:

res9: Array[(String, Int)] = Array((here,1), (good.,1), (good,2), (here.,1), (You,1), (working,1), (today.You,1), (boy.Are,1), (are,2), (a,2), (posting,1), (i,1), (boy.,1), (also,1), (I,1), (am,2), (you,1)) 

很明顯,你需要做一個更好的分流比我的簡單例子。

1

首先你的字典應該是一個集合,因爲從一般意義上講,你需要將集合映射到包含它們的文檔數量。

所以你的數據應該是這樣的:

scala> val docs = List("i am a good boy.Are you a good boy.","You are also working here.","I am posting here today.You are good.") 
docs: List[String] = List(i am a good boy.Are you a good boy., You are also working here., I am posting here today.You are good.) 

你的字典應該是這樣的:

scala> val dictionary = Set("good", "working", "posting") 
dictionary: scala.collection.immutable.Set[String] = Set(good, working, posting) 

然後你要實現你的改造,爲contains功能它可能看起來最簡單的邏輯像:

scala> dictionary.map(k => k -> docs.count(_.contains(k))) toMap 
res4: scala.collection.immutable.Map[String,Int] = Map(good -> 2, working -> 1, posting -> 1) 

爲了更好的解決方案,我會建議最終你實現特定的功能,滿足您的要求

(字符串,字符串)=>布爾

確定期限的文件中存在:

scala> def foo(doc: String, term: String): Boolean = doc.contains(term) 
foo: (doc: String, term: String)Boolean 

然後最終解決方案將看起來像:

scala> dictionary.map(k => k -> docs.count(d => foo(d, k))) toMap 
res3: scala.collection.immutable.Map[String,Int] = Map(good -> 2, working -> 1, posting -> 1) 

你最不得不做的是t o使用SparkContext計算結果圖。首先你必須定義你想要並行化的數據。假設我們想要並行化文檔集合,那麼解決方案可能如下所示:

val docsRDD = sc.parallelize(List(
    "i am a good boy.Are you a good boy.", 
    "You are also working here.", 
    "I am posting here today.You are good." 
)) 
docsRDD.mapPartitions(_.map(doc => dictionary.collect { 
    case term if doc.contains(term) => term -> 1 
})).map(_.toMap) reduce { case (m1, m2) => merge(m1, m2) } 

def merge(m1: Map[String, Int], m2: Map[String, Int]) = 
    m1 ++ m2 map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }