0
我有一個方案來處理文件中的記錄。文件中的數據按照週期性添加(每毫秒)。所以我需要讀取文件並對其進行處理,同時只處理新添加的記錄。如何僅從文件處理新記錄?
我遇到了基於Spark SQL構建的Spark結構化流的概念。我在做什麼是 -
1)觸發文件流處理每1秒 2)運行星火SQL查詢的文件 3)編寫查詢的輸出上追加模式控制檯。
下面是相同的代碼 -
public static class SparkStreamer implements Runnable,Serializable {
@Override
public void run() {
processDataStream();
}
private void processDataStream() {
Dataset<Row> rowData = spark.readStream().format("Text").load("C:\\Test\\App\\");
Dataset<String> data = rowData.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String row) throws Exception {
return Arrays.asList(row.split("\\|")).iterator();
}
},Encoders.STRING());
Dataset<Row> dataCount = data.select(new Column("value"));
StreamingQuery query = dataCount.writeStream()
.outputMode("append")
.format("console")
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
通過上述實施,執行查詢1次,但這時如果我在文件中添加新的記錄,第二批執行不獲取觸發。
其他意見 -
1)帶輸出模式完全&更新沒有輸出。只有在追加模式下,我獲得了1次輸出。
有人可以幫助解決這個問題? Spark結構化流處理是否支持處理來自文件的數據,因爲普通的Spark Streaming不支持。