我正在S3中構建數據湖。因此,我想將原始數據流存儲到s3中,下面是我的代碼片斷,我嘗試了本地存儲。將數據流傳輸到S3
val tweets = TwitterUtils.createStream(ssc, None)
val engtweets = tweets.filter(status => status.getLang() == "en").map(x => x.getText())
import sql.implicits._
engtweets.foreachRDD { rdd =>
val df = rdd.toDF()
df.write.format("json").save("../Ramesh")
}
我想在s3中存儲原始數據(整個JSON對象)。
謝謝你的回覆。我能夠在本地存儲數據。由於數據流式傳輸,我想將數據存儲在firehose中並將其推送到S3。因此,我用Java編寫了一個存儲到kinesis firehose中的方法,並且工作正常。但是,我無法從Dstreams獲取字符串。 val tweets = TwitterUtils.createStream(ssc,None) val engtweets = tweets.filter(status => status.getLang()==「en」) val statuses = engtweets.map(engtweets => engtweets.toString() )statuses.map(record => record.getBytes()。toString())。print() –