2
我的代碼如下:在調用execute()之後是否可以在FLINK CEP中添加新模式?
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyObject> input = env.addSource(new MyCustomSource());
Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");
PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);
......定義我的模式
DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());
resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));
try
{
env.execute();
}
catch (Exception exception)
{
log.debug("Error while ", exception);
}
此代碼的工作和我想要做什麼,我也得到遵循我設置的模式的結果流。
我想知道的是,如果有可能將新模式應用到此源,我希望以後再添加到環境中,從而在不調用env.execute()的情況下獲得與不同模式匹配的不同模式,因爲當我這樣做除了我的新結果流,我得到多餘的舊結果流(即舊模式得到執行多次)?
如果我正確理解你,你想在開始執行後修改你的Flink作業? Flink的保存點可能會幫助你解決這個用例:http://data-artisans.com/savepoints-part-2-updating-applications/ – twalthr
@twalthr只有在沒有錯誤的情況下才能使用命令行工作,不是嗎?在那種情況下,並不是我的意思,我想動態添加flink cep模式,目前顯然不支持本地模式,並且必須按照Till的答案自己實現。 –