2017-10-08 69 views
0

我正在S3中構建數據湖。因此,我想將原始數據流存儲到s3中,下面是我的代碼片斷,我嘗試了本地存儲。將數據流傳輸到S3

val tweets = TwitterUtils.createStream(ssc, None) 
val engtweets = tweets.filter(status => status.getLang() == "en").map(x => x.getText()) 
    import sql.implicits._ 
engtweets.foreachRDD { rdd => 
    val df = rdd.toDF() 
    df.write.format("json").save("../Ramesh") 
} 

我想在s3中存儲原始數據(整個JSON對象)。

回答

0

您可以簡單地使用saveAsTextFile方法與路徑前綴爲

s3a://<file path> 

需要,您的Amazon S3是建立正確有或沒有憑據。

https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html

+0

謝謝你的回覆。我能夠在本地存儲數據。由於數據流式傳輸,我想將數據存儲在firehose中並將其推送到S3。因此,我用Java編寫了一個存儲到kinesis firehose中的方法,並且工作正常。但是,我無法從Dstreams獲取字符串。 val tweets = TwitterUtils.createStream(ssc,None) val engtweets = tweets.filter(status => status.getLang()==「en」) val statuses = engtweets.map(engtweets => engtweets.toString() )statuses.map(record => record.getBytes()。toString())。print() –

1

就設置在覈心site.xml中的訪問密鑰和祕密密鑰如下:

<property> 
    <name>fs.s3a.access.key</name> 
    <value>...</value> 
</property> 
<property> 
    <name>fs.s3a.secret.key</name> 
    <value>...</value> 
</property> 

一旦你做到了這一點,你應該能夠使用S3協議寫入到S3像:s3a:///

希望這有助於!