2016-11-07 20 views
1

我創建了一個帶有配置的流式傳輸環境,並嘗試使用方法RichMapFunction訪問此配置。如何在DataStream程序中配置用戶功能?

實施例:

Configuration conf = new Configuration(); 
    conf.setBoolean("a", true); 
    StreamExecutionEnvironment env = 
     StreamExecutionEnvironment.createLocalEnvironment(8, conf); 

    DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5); 
    source.map(new RichMapFunction<Integer, Integer>() { 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      boolean a = parameters.getBoolean("a", false); 
      super.open(parameters); 
     } 

     @Override 
     public Integer map(Integer value) throws Exception { 
      return value; 
     } 
    }).print(); 

    env.execute(); 

然而,調試open()方法時,我發現的配置是空的。

我在做什麼錯?如何在流式傳輸環境中正確地將配置傳遞給RichFunction

回答

1

在您的示例中,Flink的DataStream和DataSet API共享相同的用戶函數接口,如RichMapFucntion

Flink RichFunctionopen方法的參數Configuration是來自DataSet API的第一個版本的遺留問題,並未在DataStream API中使用。 Flink會將您在map()調用中提供的對象序列化並將其發送給並行工作人員。因此,您可以直接在對象中將參數設置爲常規字段。

+0

您可能需要考慮使用此信息更新文檔。所有最新的文檔都顯示了'withParameters'方法可用,但沒有找到的地方。 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html –

+0

是的,好點。它已經被更新爲下一個版本(1.4)https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html。不知道爲什麼這個修補程序沒有回到1.3分支。 –

相關問題