2016-09-09 34 views
1

您好我正在嘗試使用Apache Spark Streaming讀取來自Twitter的推文,並試圖轉換爲一個DataFrame。我有下面粘貼的方法。但是,我並不能夠得到正確的方法。有些指針會受到歡迎。將一個DStream轉換爲一個數據幀

正如你可以看到轉換到DF內部的foreach並沒有從tweetStream中獲得一個DF。我可能有錯誤的方法,因爲我是新手。我如何解決這個問題?

val tweetStream = TwitterUtils.createStream(ssc, Utils.getAuth).filter(status=>status.getLang=="en") 
     .map(status=>gson.toJson(status)) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 
    tweetStream.foreachRDD({status=>val DF = status.toDF()}) 
+0

我想使用一個循環內DF.merge()進去foreachRDD計算整個DF的{} – Ayon

回答

0

我還沒有嘗試過,但也許是這樣工作的:

var df_tweets:DataFrame = null 

    dstream_tweets.foreachRDD { 
    rrd => if (df_tweets != null) { 
     df_tweets = df_tweets.unionAll(rdd.toDF) // combine previous dataframe 
    } else { 
     df_tweets = rdd.toDF() // create new dataframe 
     } 
    } 
相關問題