1
我有一個作業從Cassandra讀取數據並將數據存儲爲List(下面附帶的方法fillOnceGeoFencesFromDB()),並且創建StreamExecutionEnvironment並使用來自Kafka隊列的數據。在Flink作業之間傳遞參數
DataStream轉換期間,我嘗試引用最近填充的靜態ArrayList,但它是空的。
將先前填充的列表傳遞給下一個作業的最佳做法是什麼? 任何想法將不勝感激。
private static ArrayList<GeoFences> allGeoFences = new ArrayList<>();
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", KAFKA_GROUP);
kafkaProps.setProperty("auto.offset.reset", "earliest");
fillOnceGeoFencesFromDB(); // populate data in ArrayList<GeoFences> allGeoFences
DataStream <Tuple6<UUID, String, String, String, String, Timestamp>> stream_parsed_with_timestamps = env
.addSource(new FlinkKafkaConsumer010<>(KAFKA_SUBSCRIBE_TOPIC, new SimpleStringSchema(), kafkaProps))
.rebalance().map(new MapFunction<String, Tuple4<UUID, String, String, Timestamp>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple4<UUID, String, String, Timestamp> map(String value) throws Exception {
return mapToTuple4(value);
}})
。 。 。 。 。 。
感謝您的解釋。你是對的工作 – Mgreg
好聽。隨時接受答案:) – TobiSH
聲望太低的人...我會:) – Mgreg