僅用於學習使用數據流式傳輸的新Spark結構,我嘗試了這種實驗,但不確定是否對流式傳輸功能做了任何錯誤處理。Spark 2.1.0使用本地CSV文件流式傳輸
首先,我開始用靜態的東西,只是用星火2.1.0來的簡單的文本(CSV)文件:
val df = spark.read.format("csv").load(".../spark2/examples/src/main/resources/people.txt")
df.show()
,我可以得到這樣的合理輸出(齊柏林下)。
+-------+---+
| _c0|_c1|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
和下面的例子中,我只是修改的代碼讀取相同的文件,並提供架構
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.schema(userSchema) // Specify schema of the csv files
.format("csv")
.load(".../spark2/examples/src/main/resources/people.csv")
而且沒有錯誤信息,所以我想將數據寫入到存儲器中,看到了結果與下面的代碼:
val outStream = csvDF.writeStream
.format("memory")
.queryName("logs")
.start()
sql("select * from logs").show(truncate = false)
,但是由於沒有錯誤消息,我一直得到「空輸出」與
+----+---+
|name|age|
+----+---+
+----+---+
這些代碼是在Zeppelin 0.7下測試的,我不確定我是否錯過了這裏的任何東西。同時,我從Apache Spark 2.1.0官方網站$nc -lk 9999
嘗試過這個例子,它運行得非常好。
我可以學習如果我做錯了什麼嗎?
[改性&測試]
- 我試圖和複製相同的文件people.txt到people1.csv一個下 peopele2.csv people3.csv .../CSV /文件夾
val csvDF = spark.readStream.schema(userSchema).csv("/somewhere/csv")
csvDF.groupBy("name").count().writeStream.outputMode("complete").format("console").start().awaitTermination()
,我得到這個:
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
| name|count|
+-------+-----+
|Michael| 3|
| Andy| 3|
| Justin| 3|
+-------+-----+
因此,我可能不會認爲這是一個數據readstream()問題...
欣賞。我也嘗試了people.csv,沒有錯誤信息。但在最終輸出時仍然是空的。 –
我的意思是正確的文件名應該是'people.txt'。你碰巧創建了一個名爲'people.csv'的文件夾嗎? – zsxwing