我想要一個文本文件並創建一個不以點分隔的所有單詞的二元組,刪除任何特殊字符。我試圖用Spark和Scala來做到這一點。如何使用Spark/Scala中的頻率計數從文本文件創建一個bigram?
本文:
你好我的朋友。你今天如何
?再見,我的朋友。
應該產生如下:
招呼我,1個
我的朋友,2
如何,1
今天,1
今天輪空,1
再見我,1
我想要一個文本文件並創建一個不以點分隔的所有單詞的二元組,刪除任何特殊字符。我試圖用Spark和Scala來做到這一點。如何使用Spark/Scala中的頻率計數從文本文件創建一個bigram?
本文:
你好我的朋友。你今天如何
?再見,我的朋友。
應該產生如下:
招呼我,1個
我的朋友,2
如何,1
今天,1
今天輪空,1
再見我,1
對於RDD中的每一行,首先根據'.'
進行分割。然後通過分割' '
來標記每個產生的子字符串。一旦被標記化,用replaceAll
刪除特殊字符並轉換爲小寫字母。每個這些子列表都可以使用sliding
轉換爲包含bigrams的字符串數組的迭代器。
然後,按照要求將bigram數組展平爲字符串並將其轉換爲mkString
,然後在groupBy
和mapValues
之間獲得每個數的計數。
最後從RDD中平滑,縮小和收集(二元組,count)元組。
val rdd = sc.parallelize(Array("Hello my Friend. How are",
"you today? bye my friend."))
rdd.map{
// Split each line into substrings by periods
_.split('.').map{ substrings =>
// Trim substrings and then tokenize on spaces
substrings.trim.split(' ').
// Remove non-alphanumeric characters, using Shyamendra's
// clean replacement technique, and convert to lowercase
map{_.replaceAll("""\W""", "").toLowerCase()}.
// Find bigrams
sliding(2)
}.
// Flatten, and map the bigrams to concatenated strings
flatMap{identity}.map{_.mkString(" ")}.
// Group the bigrams and count their frequency
groupBy{identity}.mapValues{_.size}
}.
// Reduce to get a global count, then collect
flatMap{identity}.reduceByKey(_+_).collect.
// Format and print
foreach{x=> println(x._1 + ", " + x._2)}
you today, 1
hello my, 1
my friend, 2
how are, 1
bye my, 1
today bye, 1
爲了從任何標點符號分開整個單詞考慮例如
val words = text.split("\\W+")
它提供在這種情況下
Array[String] = Array(Hello, my, Friend, How, are, you, today, bye, my, friend)
配對字轉換爲元組證明更內聯用的概念一個二元組,因此考慮例如
for(Array(a,b,_*) <- words.sliding(2).toArray)
yield (a.toLowerCase(), b.toLowerCase())
其通過ohruunuruus產生
Array((hello,my), (my,friend), (friend,How), (how,are),
(are,you), (you,today), (today,bye), (bye,my), (my,friend))
答案否則傳達的簡明的方法。
分割上的不錯工作,雖然這將爲OP不需要的「朋友如何」創建一個雙向的。你對bigrams和元組完全正確。 – ohruunuruus
這應該在星火工作:
def bigramsInString(s: String): Array[((String, String), Int)] = {
s.split("""\.""") // split on .
.map(_.split(" ") // split on space
.filter(_.nonEmpty) // remove empty string
.map(_.replaceAll("""\W""", "") // remove special chars
.toLowerCase)
.filter(_.nonEmpty)
.sliding(2) // take continuous pairs
.filter(_.size == 2) // sliding can return partial
.map{ case Array(a, b) => ((a, b), 1) })
.flatMap(x => x)
}
val rdd = sc.parallelize(Array("Hello my Friend. How are",
"you today? bye my friend."))
rdd.map(bigramsInString)
.flatMap(x => x)
.countByKey // get result in driver memory as Map
.foreach{ case ((x, y), z) => println(s"${x} ${y}, ${z}") }
// my friend, 2
// how are, 1
// today bye, 1
// bye my, 1
// you today, 1
// hello my, 1
我得到這個錯誤:scala.MatchError:[Ljava.lang.String; @ 3b2c2ef(類[Ljava.lang.String;) 在行中:「.map {case Array(a,b)=> ((a,b),1)})「 – oscarm
@ scarm我明白了。如果元素較少,'sliding'將返回最後一個數組中的部分列表。更新了答案。 –
感謝@ohruunuruus。當我嘗試輸出這個文件時,我得到:'[Ljava.lang.String; @ 7358dbec [Ljava.lang.String; @ 4ece9e1d [Ljava.lang.String; @ 6f124cb [Ljava.lang.String; @ 41a68efc [Ljava.lang.String; @ 1df56410 [Ljava.lang.String; @ 5800bbcf [Ljava.lang.String; @ 7ddb1518 [Ljava.lang.String; @ 3a461b35'任何指針? – oscarm
@ scarm現在我已經有了一個解決方法,但我不確定這是做到這一點的最好方法 – ohruunuruus
很容易將連接對連接起來? – oscarm