首先你的字典應該是一個集合,因爲從一般意義上講,你需要將集合映射到包含它們的文檔數量。
所以你的數據應該是這樣的:
scala> val docs = List("i am a good boy.Are you a good boy.","You are also working here.","I am posting here today.You are good.")
docs: List[String] = List(i am a good boy.Are you a good boy., You are also working here., I am posting here today.You are good.)
你的字典應該是這樣的:
scala> val dictionary = Set("good", "working", "posting")
dictionary: scala.collection.immutable.Set[String] = Set(good, working, posting)
然後你要實現你的改造,爲contains
功能它可能看起來最簡單的邏輯像:
scala> dictionary.map(k => k -> docs.count(_.contains(k))) toMap
res4: scala.collection.immutable.Map[String,Int] = Map(good -> 2, working -> 1, posting -> 1)
爲了更好的解決方案,我會建議最終你實現特定的功能,滿足您的要求
(字符串,字符串)=>布爾
確定期限的文件中存在:
scala> def foo(doc: String, term: String): Boolean = doc.contains(term)
foo: (doc: String, term: String)Boolean
然後最終解決方案將看起來像:
scala> dictionary.map(k => k -> docs.count(d => foo(d, k))) toMap
res3: scala.collection.immutable.Map[String,Int] = Map(good -> 2, working -> 1, posting -> 1)
你最不得不做的是t o使用SparkContext計算結果圖。首先你必須定義你想要並行化的數據。假設我們想要並行化文檔集合,那麼解決方案可能如下所示:
val docsRDD = sc.parallelize(List(
"i am a good boy.Are you a good boy.",
"You are also working here.",
"I am posting here today.You are good."
))
docsRDD.mapPartitions(_.map(doc => dictionary.collect {
case term if doc.contains(term) => term -> 1
})).map(_.toMap) reduce { case (m1, m2) => merge(m1, m2) }
def merge(m1: Map[String, Int], m2: Map[String, Int]) =
m1 ++ m2 map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }