2017-12-18 130 views
0

我需要知道如何使用「爲」我的卡夫卡KStreams線環......下面是我的「for」循環需要被列入KStreams「爲」卡夫卡KStreams循環支持

for (int i = 0; i < 6 ; i++) { 
      try { 
       textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{"))); 
       Thread.sleep(2000); 
      }catch (InterruptedException e){ 
       e.printStackTrace(); 
      } 
     } 

和我KStreams loooks像

KStream<String, String> textlines = builder.stream("intopic"); 
KStream<String, String> mstream = textlines 
       .mapValues(value -> value.replace("[","")) 

如何添加我上面的「for」循環到我KStreams

+1

這個for循環的確切目的是什麼? KStream對象只是一種構建將在其他線程中運行的拓撲(在.start()調用之後)的方法。在你的代碼中,你只需要在你的拓撲結構中增加6倍於同一個處理器,睡眠部分對流的執行沒有任何影響,但是隻會延緩拓撲構建。 – nbchn

+0

@nbchn ok ...事情是我在'for'循環中使用了value.split來分割我的數據....所以每當我的數據被分割時它應該會睡10ms左右...這是因爲我需要我的數據來一個接一個,如果你需要更多的細節讓我知道 –

回答

1

的東西是我用過value.split在「for」循環分裂我的數據.. ..所以,每當我的數據分裂它應該睡10ms左右...這是因爲我需要我的數據來一個接一個

從你說你想要訂購。要達到您的要求,您不需要sleep。它會工作。 Kafka Streams WordCount示例(我假設您的代碼基於此示例)的作用相同:它也使用flatMapValues,傳入平面貼圖的lambda將文本行分割爲單詞。

除非我和其他人誤解了你的問題(在這種情況下,你可能應該進一步澄清你的問題),我認爲你不必要地複雜了你的代碼。

+0

好吧,因爲你說.... flatMapValues使數據來一個接一個? –

+0

[鏈接](https://stackoverflow.com/questions/47817572/adding-for-loop-with-delay-in-kafka-streams)你可以看看這個例子...這正是我所需要的做... –

+0

這是相同的答案。只需使用'flatMapValues',它可以確保數據一個接一個。 –