2017-07-09 114 views
1

我有一個作業從Cassandra讀取數據並將數據存儲爲List(下面附帶的方法fillOnceGeoFencesFromDB()),並且創建StreamExecutionEnvironment並使用來自Kafka隊列的數據。在Flink作業之間傳遞參數

DataStream轉換期間,我嘗試引用最近填充的靜態ArrayList,但它是空的。

將先前填充的列表傳遞給下一個作業的最佳做法是什麼? 任何想法將不勝感激。

private static ArrayList<GeoFences> allGeoFences = new ArrayList<>(); 

    public static void main(String[] args) throws Exception { 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setParallelism(1); 
     env.enableCheckpointing(5000); // checkpoint every 5000 msecs 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

     Properties kafkaProps = new Properties(); 
     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST); 
     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER); 
     kafkaProps.setProperty("group.id", KAFKA_GROUP); 
     kafkaProps.setProperty("auto.offset.reset", "earliest"); 

     fillOnceGeoFencesFromDB(); // populate data in ArrayList<GeoFences> allGeoFences 

     DataStream <Tuple6<UUID, String, String, String, String, Timestamp>> stream_parsed_with_timestamps = env 
       .addSource(new FlinkKafkaConsumer010<>(KAFKA_SUBSCRIBE_TOPIC, new SimpleStringSchema(), kafkaProps)) 
       .rebalance().map(new MapFunction<String, Tuple4<UUID, String, String, Timestamp>>() { 
        private static final long serialVersionUID = 1L; 

        @Override 
        public Tuple4<UUID, String, String, Timestamp> map(String value) throws Exception { 
         return mapToTuple4(value); 
        }}) 

。 。 。 。 。 。

回答

0

請記住,無論map函數發生什麼都會發生在任務管理器上,而main中的所有代碼只用於定義您的工作。

將您的參數顯式傳遞給MapFunction(這將使代碼更易於閱讀)。

private static class GeoFenceMapper implements MapFunction<String, Tuple4<UUID, String, String, Timestamp>> { 

    private ArrayList<GeoFences> allGeoFences; 

    public GeoFenceMapper(ArrayList<GeoFences> allGeoFences) { 
     this.allGeoFences = allGeoFences; 
    } 

    @Override 
    public Tuple4<UUID, String, String, Timestamp> map(String value) throws Exception { 
     return mapToTuple4(value); 
    }}) 
} 

,比使用這個新的映射:

DataStream <Tuple6<UUID, String, String, String, String, Timestamp>> stream_parsed_with_timestamps = env 
       .addSource(new FlinkKafkaConsumer010<>(KAFKA_SUBSCRIBE_TOPIC, new SimpleStringSchema(), kafkaProps)) 
       .rebalance().map(new GeoFenceMapper(fillOnceGeoFencesFromDB())) 

希望這有助於!

+0

感謝您的解釋。你是對的工作 – Mgreg

+0

好聽。隨時接受答案:) – TobiSH

+0

聲望太低的人...我會:) – Mgreg