2016-11-11 94 views
3

我遇到了一些關於最近開發flink作業的問題,它介紹了spring和hibernate,並且作業將在flink集羣上運行。所以我需要在任務管理器而不是作業管理器上運行flink運算符之前初始化spring資源。但是我找不到任何合適的StreamExecutionEnvironment方法來實現這一點。如何在flink環境中初始化flink作業的spring資源

我已經試過這樣如下一些方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); 
// etl business logic as flink operators 
FlinkOperators.run(); 
env.execute(); 

然而,當弗林克工作,其並行不止一個執行,彈簧初始化將在每個任務管理器進程。所以我不能在flink工作中使用spring。

是否有任何方法來初始化Flink作業的spring資源?

謝謝。

最好的問候。

阿爾文

+0

我現在有問題,你找到解決方案嗎? – zt1983811

回答

0

每當你需要有某種形式的上下文初始化每個任務管理器中,建立一個靜態函數(如果你使用Scala的對象內的函數)存儲靜態變量中的初始化值。

這應該夠了,因爲靜態值存儲在每個任務管理器的內存中。

我使用這個aproach加載每個任務管理器中的屬性文件,這些屬性文件包含每個作業配置。如果您正在加載文件,請檢查每個taskmanager是否有要加載的文件的副本。