2017-03-06 63 views
1

僅用於學習使用數據流式傳輸的新Spark結構,我嘗試了這種實驗,但不確定是否對流式傳輸功能做了任何錯誤處理。Spark 2.1.0使用本地CSV文件流式傳輸

首先,我開始用靜態的東西,只是用星火2.1.0來的簡單的文本(CSV)文件:

val df = spark.read.format("csv").load(".../spark2/examples/src/main/resources/people.txt") 
df.show() 

,我可以得到這樣的合理輸出(齊柏林下)。

+-------+---+ 
| _c0|_c1| 
+-------+---+ 
|Michael| 29| 
| Andy| 30| 
| Justin| 19| 
+-------+---+ 

和下面的例子中,我只是修改的代碼讀取相同的文件,並提供架構

val userSchema = new StructType().add("name", "string").add("age", "integer") 

val csvDF = spark 
    .readStream 
    .schema(userSchema)  // Specify schema of the csv files 
    .format("csv") 
    .load(".../spark2/examples/src/main/resources/people.csv") 

而且沒有錯誤信息,所以我想將數據寫入到存儲器中,看到了結果與下面的代碼:

val outStream = csvDF.writeStream 
    .format("memory") 
    .queryName("logs") 
    .start() 

sql("select * from logs").show(truncate = false) 

,但是由於沒有錯誤消息,我一直得到「空輸出」與

+----+---+ 
|name|age| 
+----+---+ 
+----+---+ 

這些代碼是在Zeppelin 0.7下測試的,我不確定我是否錯過了這裏的任何東西。同時,我從Apache Spark 2.1.0官方網站$nc -lk 9999嘗試過這個例子,它運行得非常好。

我可以學習如果我做錯了什麼嗎?

[改性&測試]

  1. 我試圖和複製相同的文件people.txt到people1.csv一個下 peopele2.csv people3.csv .../CSV /文件夾
  2. val csvDF = spark.readStream.schema(userSchema).csv("/somewhere/csv")
  3. csvDF.groupBy("name").count().writeStream.outputMode("complete").format("console").start().awaitTermination()

,我得到這個:

------------------------------------------- 
Batch: 0 
------------------------------------------- 
+-------+-----+ 
| name|count| 
+-------+-----+ 
|Michael| 3| 
| Andy| 3| 
| Justin| 3| 
+-------+-----+ 

因此,我可能不會認爲這是一個數據readstream()問題...

回答

0
  1. 文件名是people.txt,不people.csv。 Spark會拋出一個錯誤,指出「路徑不存在」。我只是使用Spark Shell來驗證它。

  2. 輸入路徑應該是一個目錄。使用文件是沒有意義的,因爲這是一個流式查詢。

+0

欣賞。我也嘗試了people.csv,沒有錯誤信息。但在最終輸出時仍然是空的。 –

+0

我的意思是正確的文件名應該是'people.txt'。你碰巧創建了一個名爲'people.csv'的文件夾嗎? – zsxwing

0

你必須在代碼2級的差別: 1.非工作人有「追加」(默認)的輸出模式,但工作一個具有「完整」輸出模式。 2.非工作人員選擇沒有聚合的記錄,但工作人員有groupBy聚合。

我建議你切換到完成輸出模式,並執行groupBy計數,看看它是否修復了這個問題。

相關問題