2016-12-02 65 views
2

我的代碼如下:在調用execute()之後是否可以在FLINK CEP中添加新模式?

StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream<MyObject> input = env.addSource(new MyCustomSource()); 

Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start"); 

PatternStream<MyObject> patternStream = CEP.pattern(input, pattern); 

......定義我的模式

DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction()); 

resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria)); 

try 
    { 
    env.execute(); 
    } 
    catch (Exception exception) 
    { 
    log.debug("Error while ", exception); 
    } 

此代碼的工作和我想要做什麼,我也得到遵循我設置的模式的結果流。

我想知道的是,如果有可能將新模式應用到此源,我希望以後再添加到環境中,從而在不調用env.execute()的情況下獲得與不同模式匹配的不同模式,因爲當我這樣做除了我的新結果流,我得到多餘的舊結果流(即舊模式得到執行多次)?

+0

如果我正確理解你,你想在開始執行後修改你的Flink作業? Flink的保存點可能會幫助你解決這個用例:http://data-artisans.com/savepoints-part-2-updating-applications/ – twalthr

+0

@twalthr只有在沒有錯誤的情況下才能使用命令行工作,不是嗎?在那種情況下,並不是我的意思,我想動態添加flink cep模式,目前顯然不支持本地模式,並且必須按照Till的答案自己實現。 –

回答

1

目前Flink的CEP庫不支持動態模式更改。因此,一旦你定義了你的模式並開始了你的工作,它將只處理這個定義好的模式。

但是,您可以編寫自己的操作員實現TwoInputStreamOperator接口,該接口接收一個輸入模式定義,並在另一個輸入上接收流記錄(類似於CoFlatMap函數)。對於每種新模式,您都必須在運營商上編譯新的NFA,並將任何新的傳入流元素也送到此NFA。這樣,你可以達到你的預期行爲。未來,我們很可能會將此功能添加到Flink的CEP庫中。

相關問題