2017-07-13 41 views
0

我有一個方案來處理文件中的記錄。文件中的數據按照週期性添加(每毫秒)。所以我需要讀取文件並對其進行處理,同時只處理新添加的記錄。如何僅從文件處理新記錄?

我遇到了基於Spark SQL構建的Spark結構化流的概念。我在做什麼是 -

1)觸發文件流處理每1秒 2)運行星火SQL查詢的文件 3)編寫查詢的輸出上追加模式控制檯。

下面是相同的代碼 -

public static class SparkStreamer implements Runnable,Serializable { 
    @Override 
    public void run() { 
     processDataStream(); 

    } 

    private void processDataStream() { 

     Dataset<Row> rowData = spark.readStream().format("Text").load("C:\\Test\\App\\"); 

     Dataset<String> data = rowData.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() { 

      @Override 
      public Iterator<String> call(String row) throws Exception { 
       return Arrays.asList(row.split("\\|")).iterator(); 
      } 


     },Encoders.STRING()); 

     Dataset<Row> dataCount = data.select(new Column("value")); 


     StreamingQuery query = dataCount.writeStream() 
        .outputMode("append") 
        .format("console") 
        .start(); 
     try { 
      query.awaitTermination(); 
     } catch (StreamingQueryException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

通過上述實施,執行查詢1次,但這時如果我在文件中添加新的記錄,第二批執行不獲取觸發。

其他意見 -

1)帶輸出模式完全&更新沒有輸出。只有在追加模式下,我獲得了1次輸出。

有人可以幫助解決這個問題? Spark結構化流處理是否支持處理來自文件的數據,因爲普通的Spark Streaming不支持。

回答

1

是否從文件

是星火結構化數據流支持處理數據。

查詢被執行1次,但如果我在文件中添加新記錄,第二批執行沒有被觸發。

這不會爲右後一度因爲看到它被標記爲一個文件已被處理,永不再(回顧FileStreamSource,負責爲它找到它是如何工作的蓋子下)處理工作。

推薦的解決方案是將新內容寫入新文件。

相關問題