2017-02-22 82 views
1

我想通過流式傳輸從Twitter獲取數據。 我正在獲取twt varibale中的數據。Scala(Zeppeline):任務不可序列化

val ssc = new StreamingContext(sc, Seconds(60)) 
val tweets = TwitterUtils.createStream(ssc, None, Array("#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")) 
//tweets.saveAsObjectFiles("/models/Twitter_files_", ".txt") 
case class Tweet(createdAt:Long, text:String, screenName:String) 

val twt = tweets.window(Seconds(60)) 
//twt.foreach(status => println(status.text()) 

import sqlContext.implicits._ 

val temp = twt.map(status=> 
    Tweet(status.getCreatedAt().getTime()/1000,status.getText(), status.getUser().getScreenName()) 
    ).foreachRDD(rdd=> 
     rdd.toDF().registerTempTable("tweets") 
    ) 
twt.print 

ssc.start() 

這裏是錯誤:

org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
     at org.apache.spark.SparkContext.withScope(SparkContext.scala:709) 
     at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266) 

Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext 

回答

0

Tweet類不是Serializable,所以擴展該。

這是一個普遍星火問題,堆棧告訴你到底是什麼努力,因爲星火1.3序列化,我相信

+0

我已經加入這樣的:案例類資料Tweet(createdAt:長,文本:字符串,屏幕名:字符串)擴展了Serializable。這是正確的方式嗎?因爲它給了我同樣的錯誤。 – Bond

+0

是的,這是正確的。你能展示更多的堆棧跟蹤嗎? –

+0

http://i.imgur.com/eAT3tCr.png – Bond

相關問題