我試圖使用Spark Scala代碼流式傳輸twitter數據。我能夠獲取數據並創建數據框並查看它。但是,當嘗試提取status.getPlace.getCountry()時,我得到顯示java.lang.NullPointerException。使用Spark的Twitter流式傳輸
星火版本:2.0.0, 斯卡拉版本:
2.11.8試圖用if條件,檢查值等,但不成功。
代碼:
val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(5))
val filters:Seq[String] = Seq("hadoop")
val cb = new ConfigurationBuilder()
.setOAuthConsumerKey("******")
.setOAuthConsumerSecret("******")
.setOAuthAccessToken("********")
.setOAuthAccessTokenSecret("******").build()
val twitter_auth = new TwitterFactory(cb)
val a = new OAuthAuthorization(cb)
val atwitter:Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val data = tweetsdstream.map {status =>
val places = status.getPlace
val id = status.getUser.getId
val date = status.getUser.getCreatedAt.toString()
val user = status.getUser.getName()
val place = places.getCountry()
(id,date,user,place)
}
data.foreachRDD{rdd =>
import spark.implicits._
rdd.toDF("id","date","user","place").show()
}
ssc.start()
ssc.awaitTermination()
是否有來自Twitter的訪問位置信息有任何限制? 任何建議都會有幫助。
感謝
實際上大部分時間'getPlace'和'getCountry'都包含null值,您可以嘗試使用geoLocation而不是 –