2016-02-02 34 views
2

我只想將流的內容打印到控制檯。我寫了下面的代碼,但它不打印任何東西。任何人都可以幫助我在Spark中讀取文本文件作爲流?是否有與Windows系統有關的問題?在Windows系統中打印流(Spark流)的內容

public static void main(String[] args) throws Exception { 

    SparkConf sparkConf = new SparkConf().setAppName("My app") 
     .setMaster("local[2]") 
     .setSparkHome("C:\\Spark\\spark-1.5.1-bin-hadoop2.6") 
     .set("spark.executor.memory", "2g"); 

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 

    JavaDStream<String> dataStream = jssc.textFileStream("C://testStream//copy.csv"); 
    dataStream.print(); 

    jssc.start(); 
    jssc.awaitTermination(); 
} 

UPDATE:copy.csv的內容是

0,0,12,5,0 
0,0,12,5,0 
0,1,2,0,42 
0,0,0,0,264 
0,0,12,5,0 
+0

'copy.csv'的內容是什麼? – Sumit

+0

它是用逗號分隔的文件。我用內容更新了我的問題 – Eyad

回答

3

textFileStream是監控Hadoop的支持目錄。此操作將監視提供的目錄,並且當您在提供的目錄中添加新文件時,它將讀取/流式傳輸新添加的文件中的數據。

您無法使用textFileStream來閱讀文本/ csv文件,或者我會說,如果您只是閱讀文件,則不需要流式傳輸。

我的建議是監視某個目錄(可能是HDFS或本地文件系統),然後使用textFileStream添加文件並捕獲這些新文件的內容。

可能在你的代碼可能是你可以用C://testStream",一旦你的星火流的工作是建立和運行,然後添加文件copy.csvC://testStream文件夾,看到星火控制檯輸出替換"C://testStream//copy.csv"

OR

可能是你可以編寫讀取該文件的另一個命令行斯卡拉/ Java程序和扔在插座上的內容(在某一端口#)和未來可以用於捕獲和讀取利用socketTextStream數據。一旦您讀取了數據,您將進一步應用其他轉換或輸出操作。

你也可以認爲利用Flume

參考API Documentation更多細節

+0

實際上,我不僅在閱讀這些文件,而且爲了讓我的問題保持清晰,我寫了一小段代碼。實際上,我想將流的內容發送給樸素貝葉斯模型以進行預測(機器學習),但不幸的是,我不能僅將文件作爲流讀取。你能否向我解釋一下,我該如何處理Spark中的csv文件?我不明白如何在Windows中監視HDFS或本地文件系統?我嘗試用C:// testStream替換「C://testStream//copy.csv」,但是當我將這些文件添加到目錄時,這不起作用。!!! – Eyad

+0

我已經詳細闡述了我對閱讀文本的回答文件,就'textFileStream'而言,確保你是使用唯一/唯一名稱創建新文件,並且內容也應該是新的。不要重命名或複製具有相同內容的文件,這是行不通的。 – Sumit

1

這爲我工作在Windows 7和Spark 1.6.3:(刪除的代碼的其餘部分,重要的是如何定義監視的文件夾)

val ssc = ... 
val lines = ssc.textFileStream("file:///D:/tmp/data") 
... 
print 

...

此監控目錄d:/ tmp目錄/數據,SSC是我的海峽eaming方面

步驟:

  1. 創建一個文件1.txt的說在d:/ tmp目錄/數據
  2. 輸入一些文字
  3. 啓動SPART應用
  4. 文件重命名爲數據。 TXT(我相信任何任意的名字會做,只要它改變,而目錄受火花監控)

我注意到的另一件事是臨時我不得不將行分隔符更改爲Unix樣式(使用Notepad ++),否則文件不會被拾取。