2017-05-25 141 views
1

我是新來弗林克 創建文件(CSV或文本)我有改造想這樣弗林克流上的時間窗口

val supportTask= customSource 

    .map(line => line.split(",")) 
    .map(line => SupportTaskNew(line(0)toInt,line(1).toString,line(2)toString,line(3)toLong,line(4).toString,line(5)toInt,line(6)toInt)) 
    .filter(_ => true) //todo put sent date condition 
    .map(line => Count(1)) 
    .keyBy(0) 
    .timeWindow(Time.seconds(20)) //todo for time being 10 seconds, actuals 30 min 
.sum(0) 

現在我想20秒鐘的間隔時間窗口

supportTask.writeAsText(("D://myfile_"+Calendar.getInstance().get(Calendar.SECOND)),WriteMode.NO_OVERWRITE).setParallelism(1) 
創建文件

我已經提供了文件名+秒,以便每次創建文件時附加秒。

但是這裏只有一個文件被創建,我想創建每20秒窗口的新文件我該怎麼做?

+0

使用** DataStream.writeUsingOutputFormat()API **。 ** writeAsText **將所有輸出記錄寫入參數中指定的文件。你必須實現一個特殊的輸出格式來實現這一點。 – David

回答