2015-11-01 69 views
0

我是scala的新手,並學習如何使用scala處理twitter流。 我一直在玩下面的示例代碼,並試圖修改它做一些其他的東西。scala twitter streaming:融化元組的元組

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala#L60

我有元組(元組也許不流階確切類型的名稱,但..)總結了這樣的鳴叫每一個元組:(用戶名,(井號標籤的元組),(用戶的元組在本推文中提到))

下面是我用來做這個的代碼。

val sparkConf = new SparkConf().setAppName("TwitterPopularTags") 
val ssc = new StreamingContext(sparkConf, Seconds(duration.toInt)) 
val stream = TwitterUtils.createStream(ssc, None) 

// record username, hashtags, and mentioned user 
val distilled = stream.map(status => (status.getUser.getName, status.getText.split(" ").filter(_.startsWith("#")), status.getText.split(" ").filter(_.startsWith("@")))) 

我想要做的是將這個元組融入(標記,用戶,(提到的用戶))。 例如,如果原來的元組是

(Tom, (#apple, #banana), (@Chris, @Bob)) 

我想要的結果是

((#apple, Tom, (@Chris, @Bob)), (#banana, Tom, (@Chris, @Bob)) 

我的目標是使用#標籤爲重點,以獲得

(#apple, (list of users who tweeted this hashtag), (list of users who were mentioned in tweets with this hashtag)) 
這個結果運行reduceByKey

我不確定'熔化'是用於此目的的正確術語,但可以認爲它與R中的熔化函數類似。是否有一種方法可以使用.map {case ...}來完成此操作或.fla tMap {case ...}?還是我必須定義一個函數來完成這項工作?


ADDED減少問題:

正如我說我要減少reduceByKeyAndWindow的結果,所以我寫了下面的代碼:

// record username, hashtags, and mentioned user 
val distilled = stream.map(
    status => (status.getUser.getName, 
    status.getText.split(" ").filter(_.startsWith("#")), 
    status.getText.split(" ").filter(_.startsWith("@"))) 
) 

val byTags = distilled.flatMap{ 
    case (user, tag, mentioned) => tag.map((_ -> List(1, List(user), mentioned))) 
}.reduceByKeyAndWindow({ 
    case (a, b) => List(a._1+b._1, a._2++b._2, a._3++b._3)}, Seconds(60), Seconds(duration.toInt) 
) 

val sorted = byTags.map(_.flatten).map{ 
    case (tag, count, users, mentioned) => (count, tag, users, mentioned) 
}.transform(_.sortByKey(false)) 

// Print popular hashtags 
sorted.foreachRDD(rdd => { 
    val topList = rdd.take(num.toInt) 
    println("\n%d Popular tags in last %d seconds:".format(num.toInt, duration.toInt)) 
    topList.foreach{case (count, tag, users, mentioned) => println("%s (%s tweets), authors: %s, mentioned: %s".for$ 
}) 

然而,它說

missing parameter type for expanded function 
[error] The argument types of an anonymous function must be fully known. (SLS 8.5) 
[error] Expected type was: ? 
[error]  }.reduceByKeyAndWindow({ 

我試過刪除括號和個案,寫作(a:List,b:List)=>,但是他們都給了我err或與類型相關。什麼是正確的方法來減少它,使用戶和提到將被連接每個「持續時間」秒60秒?

回答

0
hashTags.flatMap{ case (user, tags, mentions) => tags.map((_, user,mentions))} 

在你的問題中最麻煩的事情是濫用術語tuple

python tuple表示可以有任意大小的不可變類型。

在scala TupleN表示具有N型參數的不可變類型恰好包含相應類型的N個成員。所以Tuple2Tuple3不一樣。

斯卡拉其中充滿不可變的類型,任何不可變集合像ListVectorStream可被視爲Python的tuple的模擬。但最精確的可能是immutable.IndexedSeq的子類型,例如Vector

因此,像String.splitAt這樣的方法永遠不會以scala的形式返回Tuple,只是因爲在編譯時無法知道元素數量。

在該具體情況下,結果將是簡單的[Array][5]。我在答案中使用了這樣的假設。

但在情況下,如果你真的有收集(即RDD)型(String, (String, String), (String, String))您可以使用此代碼幾乎相當於一塊

hashTags.flatMap { 
    case (user, (tag1, tag2), mentions) => Seq(tag1, tag2).map((_, user, mentions)) 
} 
+0

感謝您的幫助! –