2017-07-29 73 views
2

我正在使用Apache Flink和KafkaConsumer讀取卡夫卡主題中的一些值。 我也有一個從閱讀文件中獲得的流。Apache Flink Dynamic sink of sink

根據收到的值我想寫這個流在不同的卡夫卡主題。

基本上,我有一個網絡與一個領導鏈接到許多孩子。對於每個孩子來說,領導者需要在特定兒童卡夫卡主題中編寫流派流,以便孩子可以閱讀。 當孩子開始時,它會註冊在領導者提供的卡弗卡話題中。 問題是我不知道我有多少孩子。

例如,我從卡夫卡主題中讀取1,我想僅在一個名爲Topic1的卡夫卡主題中寫入流。 我讀了1-2我想寫兩個卡夫卡主題。 (Topic1和Topic2)

我不知道是否有可能,因爲爲了寫上主題我使用Kafka Producer和AddSink方法,並且我的理解(以及我的試驗)似乎是這樣的Flink需要知道接收器的數量。

但是,那麼沒有辦法獲得這樣的行爲?

回答

1

如果我很好地理解了您的問題,我認爲您可以使用單個接收器解決問題,因爲您可以根據正在處理的記錄選擇卡夫卡主題。似乎來自源的一個元素可能會寫入多個主題,在這種情況下,您需要FlatMapFunction將每個源記錄複製N次(每個輸出主題一個),我建議將其輸出爲一對(又名Tupple2)與(主題,記錄)。

DataStream<Tupple2<String, MyValue>> stream = input.flatMap(new FlatMapFunction<>() { 
    public void flatMap(MyValue value, Collector<Tupple2<String, MyValue>> out) { 
     for (String topic : topics) { 
      out.collect(Tupple2.of(topic, value)); 
     } 
    } 
}); 

然後你可以使用以前創建的FlinkKafkaProducerKeyedSerializationSchema計算的題目中,你實現getTargetTopic返回對的第一個元素。

stream.addSink(new FlinkKafkaProducer10<>(
     "default-topic", 
     new KeyedSerializationSchema<>() { 
      public String getTargetTopic(Tupple2<String, MyValue> element) { 
       return element.f0; 
      } 
      ... 
     }, 
     kafkaProperties) 
); 
相關問題