2017-02-23 82 views
3

如何從單個主題創建多個流?當我做這樣的事情:來自單個主題的多個流

KStreamBuilder builder = new KStreamBuilder(); 

builder.stream(Serdes.String(), Serdes.String(), "master") 
      /* Filtering logic */ 
      .to(Serdes.String(), Serdes.String(), "output1"); 

builder.stream(Serdes.String(), Serdes.String(), "master") 
      /* Filtering logic */ 
      .to(Serdes.String(), Serdes.String(), "output2"); 

KafkaStreams streams = new KafkaStreams(builder, /* config */); 
streams.start(); 

我得到以下錯誤:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source. 
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347) 
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92) 

我需要做KafkaStreams的另一個實例從「大師」每個流?

回答

7

您可以創建一個KStream可以重複使用:

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master"); 

,那麼你可以重複使用它:

inputStream.filter(..logic1) 
     .to(Serdes.String(), Serdes.String(), "output1"); 
inputStream.filter(..logic2) 
     .to(Serdes.String(), Serdes.String(), "output2"); 

KafkaStreams streams = new KafkaStreams(builder, /* config */); 
streams.start(); 
+2

如果你沒有在你的過濾器的重疊,你也可以使用' inputStream.branch()'返回非重疊子流。 –