2017-06-30 36 views
2

下面的代碼從套接字讀取,但我沒有看到任何輸入進入作業。我有nc -l 1111運行,並傾銷數據雖然,不知道爲什麼我的火花工作無法從10.176.110.112:1111讀取數據。如何從套接字讀取流數據集?

Dataset<Row> d = sparkSession.readStream().format("socket") 
            .option("host", "10.176.110.112") 
            .option("port", 1111).load(); 

回答

2

下面的代碼從套接字讀取,但我沒有看到任何輸入進入工作。

那麼,說實話,你做不是從任何地方讀取任何東西。你只描述你在開始流式管道時做什麼

由於您使用結構化流式傳輸從套接字讀取數據集,因此應該使用start運算符來觸發數據獲取(並且只有在您定義接收器之後)。

start()方法:StreamingQuery啓動流媒體查詢的執行,這將持續輸出結果,以給定的路徑作爲新的數據到達。返回的StreamingQuery對象可用於與流進行交互。

之前start您應該定義數據流的位置。它可能是卡夫卡,文件,自定義流式接收器(可能使用foreach運營商)或控制檯。

我在下面的例子中使用了console sink(aka格式)。我也使用Scala,並將其重寫爲Java作爲您的家庭練習。

d.writeStream. // <-- this is the most important part 
    trigger(Trigger.ProcessingTime("10 seconds")). 
    format("console"). 
    option("truncate", false). 
    start   // <-- and this