我是scala的新手,並學習如何使用scala處理twitter流。 我一直在玩下面的示例代碼,並試圖修改它做一些其他的東西。scala twitter streaming:融化元組的元組
我有元組(元組也許不流階確切類型的名稱,但..)總結了這樣的鳴叫每一個元組:(用戶名,(井號標籤的元組),(用戶的元組在本推文中提到))
下面是我用來做這個的代碼。
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(duration.toInt))
val stream = TwitterUtils.createStream(ssc, None)
// record username, hashtags, and mentioned user
val distilled = stream.map(status => (status.getUser.getName, status.getText.split(" ").filter(_.startsWith("#")), status.getText.split(" ").filter(_.startsWith("@"))))
我想要做的是將這個元組融入(標記,用戶,(提到的用戶))。 例如,如果原來的元組是
(Tom, (#apple, #banana), (@Chris, @Bob))
我想要的結果是
((#apple, Tom, (@Chris, @Bob)), (#banana, Tom, (@Chris, @Bob))
我的目標是使用#標籤爲重點,以獲得
(#apple, (list of users who tweeted this hashtag), (list of users who were mentioned in tweets with this hashtag))
這個結果運行reduceByKey
我不確定'熔化'是用於此目的的正確術語,但可以認爲它與R中的熔化函數類似。是否有一種方法可以使用.map {case ...}來完成此操作或.fla tMap {case ...}?還是我必須定義一個函數來完成這項工作?
ADDED減少問題:
正如我說我要減少reduceByKeyAndWindow的結果,所以我寫了下面的代碼:
// record username, hashtags, and mentioned user
val distilled = stream.map(
status => (status.getUser.getName,
status.getText.split(" ").filter(_.startsWith("#")),
status.getText.split(" ").filter(_.startsWith("@")))
)
val byTags = distilled.flatMap{
case (user, tag, mentioned) => tag.map((_ -> List(1, List(user), mentioned)))
}.reduceByKeyAndWindow({
case (a, b) => List(a._1+b._1, a._2++b._2, a._3++b._3)}, Seconds(60), Seconds(duration.toInt)
)
val sorted = byTags.map(_.flatten).map{
case (tag, count, users, mentioned) => (count, tag, users, mentioned)
}.transform(_.sortByKey(false))
// Print popular hashtags
sorted.foreachRDD(rdd => {
val topList = rdd.take(num.toInt)
println("\n%d Popular tags in last %d seconds:".format(num.toInt, duration.toInt))
topList.foreach{case (count, tag, users, mentioned) => println("%s (%s tweets), authors: %s, mentioned: %s".for$
})
然而,它說
missing parameter type for expanded function
[error] The argument types of an anonymous function must be fully known. (SLS 8.5)
[error] Expected type was: ?
[error] }.reduceByKeyAndWindow({
我試過刪除括號和個案,寫作(a:List,b:List)=>,但是他們都給了我err或與類型相關。什麼是正確的方法來減少它,使用戶和提到將被連接每個「持續時間」秒60秒?
感謝您的幫助! –