2016-11-19 30 views
1

比方說,我有文件了一堆用逗號分隔的短語:如何計算使用Spark的文檔行中的所有共生​​元素?

love, new, truck, present 
environment, save, trying, stop, destroying 
great, environment, save, money, animals, zoo, daughter, fun 
impressive, loved, speech, inspiration 
Happy Birthday, brother, years, old 
save, money, stop, spending 
new, haircut, love, check it out 

現在我想用星火數共發生元件的數量。因此,我想看看

{ 
    (love, new): 2, 
    (new, truck): 1, 
    (love, truck): 1, 
    (truck, present): 1, 
    (new, present): 1, 
    (love, present): 1, 
    (great, environment): 1, 
    (environment, save): 2, 
    (environment, trying): 1, 
    .... 
    (love, check it out): 1 
} 

有關如何做到這一點的任何建議?

我目前已經創建了文檔的RDD(我叫它phrase_list_RDD),我知道我可以使用phrase_list_RDD.flatMap(lambda line: line.split(","))來解析這個行到元素,但是我很難提出最後一部分來解決我的問題。如果有人有任何建議,我將不勝感激。

回答

-1

一旦你得到的文本行了,你可以將它們分割和計數出現數據框,如下圖所示:

import scala.collection.mutable 

object CoOccurrence { 

    val text = Seq("love, new, truck, present", "environment, save, trying, stop, destroying", "great, environment, save, money, animals, zoo, daughter, fun", "impressive, loved, speech, inspiration", "Happy Birthday, brother, years, old", "save, money, stop, spending", "new, haircut, love, check it out") 

    def main(args: Array[String]) { 
    val cooc = mutable.Map.empty[(String, String), Int] 

    text.foreach { line => 
     val words = line.split(",").map(_.trim).sorted 
     val n = words.length 
     for { 
     i <- 0 until n-1 
     j <- (i + 1) until n 
     } { 
     val currentCount = cooc.getOrElseUpdate((words(i), words(j)), 0) 
     cooc((words(i), words(j))) = currentCount + 1 
     } 
    } 

    println(cooc) 

    } 
} 
+0

的OP使用Apache星火需要的解決方案 - 這似乎不是一個...... –

+0

你是對的。我認爲他們對算法更感興趣 - 但是Spark提供了高級構造,可以簡化解決方案並使其可擴展。如果這是SO上的推薦行爲,我可以刪除我的解決方案。 – radumanolescu

+0

在此解決方案中,生成的同現地圖不會伸縮。此外,它不能被編譯爲按照規模運行的Spark:「getOrElseUpdate」不是您可以在RDD轉換中執行的操作。 – Astral

2

分開後(我已經添加了微調,以擺脫空間),你可以使用List.combinations(2)來獲得兩個單詞的所有組合。傳入flatMap,這將導致與RDD[List[String]]其中每個記錄是大小爲2

名單從那裏 - 這是一個簡單的「字數統計」:

val result: RDD[(List[String], Int)] = phrase_list_RDD 
    .map(_.split(",").map(_.trim).toList) // convert records to List[String] 
    .flatMap(_.combinations(2)) // take all combinations of two words 
    .map((_, 1))     // prepare for reducing - starting with 1 for each combination 
    .reduceByKey(_ + _)   // reduce 

// result: 
// ... 
// (List(environment, daughter),1) 
// (List(save, daughter),1) 
// (List(money, stop),1) 
// (List(great, environment),1) 
// (List(save, stop),2) 
// ... 
相關問題