我需要知道如何使用「爲」我的卡夫卡KStreams線環......下面是我的「for」循環需要被列入KStreams「爲」卡夫卡KStreams循環支持
for (int i = 0; i < 6 ; i++) {
try {
textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
和我KStreams loooks像
KStream<String, String> textlines = builder.stream("intopic");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[",""))
如何添加我上面的「for」循環到我KStreams
這個for循環的確切目的是什麼? KStream對象只是一種構建將在其他線程中運行的拓撲(在.start()調用之後)的方法。在你的代碼中,你只需要在你的拓撲結構中增加6倍於同一個處理器,睡眠部分對流的執行沒有任何影響,但是隻會延緩拓撲構建。 – nbchn
@nbchn ok ...事情是我在'for'循環中使用了value.split來分割我的數據....所以每當我的數據被分割時它應該會睡10ms左右...這是因爲我需要我的數據來一個接一個,如果你需要更多的細節讓我知道 –