2017-06-16 83 views

回答

1

檢查點是需要保存有關流處理的數據的信息,並在失敗的情況下,火花可以從上次保存的進度點恢復。處理意味着它從源讀取,(轉換)並最終寫入接收器。

因此,不需要單獨設置讀取器和寫入器的檢查點,因爲恢復後不處理僅讀取但未寫入接收器的數據,這是沒有意義的。此外,checkpointing location can be set as an option to DataStreamWriter only(從dataset.writeStream()返回)和開始流之前。

下面是一個簡單的結構化流的與檢查點的例子:

session 
    .readStream() 
    .schema(RecordSchema.fromClass(TestRecord.class)) 
    .csv("s3://test-bucket/input") 
    .as(Encoders.bean(TestRecord.class)) 
    .writeStream() 
    .outputMode(OutputMode.Append()) 
    .format("csv") 
    .option("path", "s3://test-bucket/output") 
    .option("checkpointLocation", "s3://test-bucket/checkpoint") 
    .queryName("test-query") 
    .start(); 
相關問題