2016-01-21 77 views
0

我正在使用creatDirectStream爲了整合SparkStreaming和Kafka。這裏是我使用的代碼:kafka將sparkstreaming到HDFS

val ssc = new StreamingContext(new SparkConf, Seconds(10)) 
    val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667") 
    val topics = Set("topic1") 

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 

現在我想存儲消息到HDFS。這樣做是對的嗎?

messages.saveAsTextFiles("/tmp/spark/messages") 
+0

是的,這是正常的 –

回答

0

saveAsTextFiles("/tmp/spark/messages") - 這將堅持本地文件系統的情況下,所提供的文件夾結構數據(「/ tmp目錄/火花/信息」)是當地的HDFS的一部分,那麼它也將顯示在HDFS目錄,因爲saveAsTextFiles利用相同的MapeReduce API來編寫輸出。

以上將適用於Spark Executors和HDFS在同一臺物理機器上的情況,但是如果您的HDFS目錄或URL不同,並且不在執行器運行的相同機器上,那麼這將不起作用。

如果您需要確保您的數據在HDFS中保存,那麼作爲一個良好的習慣,您應該始終提供完整的HDFS URL。這樣的事情 - saveAsTextFiles("http://<HOST-NAME>:9000/tmp/spark/messages")

,或者您也可以利用以下兩種方法之一: -

  1. DStream.saveAsNewAPIHadoopFiles(<HDFS URL with Location>)
  2. DStream.saveAsHadoopFiles(<HDFS URL with Location>)
+0

我不知道這是不是在所有情況下都是如此,我使用的是正常路徑,默認情況下爲hdfs –

+0

也不是這樣,它會創建它們不在的文件夾,或者您可以保存到您想要的根目錄 –

+0

@SebastianPiu - 我從來沒有說過它不會創建文件夾。您可能需要再次閱讀我的答案。當您僅針對文件夾路徑提供完整的HDFS URL時,就會有所不同。再次,這是一個很好的做法,總是提供完整的URL +路徑 – Sumit

相關問題