2014-10-11 49 views
1

好大師的,我有一個問題,對我沒有多大意義。 我堅持努力拯救的目的是MongoDB的,看起來像這樣(大約)MongoDBObject沒有被添加到rrd foreach循環內casbah scala apache spark

{data:[baseball:[{timestamp (essentially):tweet},{timestamp: another tweet}] 
     football:[{timestamp:footballtweet},{timestamp:differentfootballtweet}] 
     ] 
timeInterval:"last minute to this minute" (i'm doing timeseries data) 
terms:["football","baseball"] 
} 

見下面哪個環路IM卡上。請注意,該問題可能與 rrd過期有關。我試圖通過堅持它在內存中解決它,但我不知道該怎麼做。

twitterStream.foreachRDD(rrd => { 
      val entryToSave = MongoDBObject() 
      val termWithTweets = MongoDBObject() 
      rrd.persist() 
      filters.foreach(term =>{ 
      var listOfTweets = MongoDBObject() 
      rrd.persist() 
      for(status <- rrd){ 
       if(status.getText.contains(term)) { 
    //   listOfTweets += status 
//Why doesnt this line below actually add the key value pair to the variable 
//defined outside of the "for(status <- rrd)" loop? I know (through debugging) 
//that it does in fact append inside the loop. 

       listOfTweets += (DateTime.now.toString() -> status.toString) 
       } 
      } 
//when I print the listOfTweets outside of the for loop it is empty, Why? 
      println("outsideRRD",listOfTweets) 
       termWithTweets += (term -> listOfTweets) 
      }) 
      entryToSave += ("data" -> termWithTweets) 
      entryToSave += ("timeInterval" -> (DateTime.lastMinute to DateTime.now).toString) 
      entryToSave += ("terms" -> filters) 
      collection.insert(entryToSave) 
     }) 

我不認爲這是一個val/var問題,雖然它可能是。我試過 兩種方式

回答

1

RDD上的計算分佈在集羣上。您無法從RDD內更新在RDD操作關閉之外創建的變量。它們基本上位於兩個不同的位置:該變量在Spark驅動程序中創建並在工作人員中訪問,應該視爲只讀。

火花支持分佈式cummulators可能在這種情況下使用: Spark Cummulators

另一種選擇(一個我寧願)是RDD的流變換成所需的數據格式和利用foreachRDD方法堅持到二級存儲。這將是解決問題的更有效的方法。它大致看起來像這樣:

val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term))) 
    val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x))) 
    filteredStreamWithTs.foreachRdd(rdd => // write to Mongo) 
+0

感謝您的跟進。這確實是更實用的方法。我沒有意識到這是一種RRD操作必須發生的方式。 – 2014-10-13 21:59:20