我正在使用Kafka流並希望重置一些消費者偏移量從Java到開始。 KafkaConsumer.seekToBeginning(...)
聽起來像做正確的事情,但我與卡夫卡流工作:從卡夫卡流重置消費者偏移量
KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();
我想這取決於具體的流管線我確定這將在引擎蓋下創建了幾位消費者。我可以訪問這些嗎?還是有其他的方式來重置偏置編程?
我正在使用Kafka流並希望重置一些消費者偏移量從Java到開始。 KafkaConsumer.seekToBeginning(...)
聽起來像做正確的事情,但我與卡夫卡流工作:從卡夫卡流重置消費者偏移量
KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();
我想這取決於具體的流管線我確定這將在引擎蓋下創建了幾位消費者。我可以訪問這些嗎?還是有其他的方式來重置偏置編程?
由於您使用的是Kafka Streams,您不僅需要重置消費者偏移量,還需要重置Streams內部狀態存儲區。
幸運的是,有一個與Kafka一起提供的Streams Application Reset Tool。
見https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
大廈漢斯Jespersens答案,我成功地使用這個代碼來執行腳本執行的Java代碼是什麼:
import kafka.tools.StreamsResetter;
StreamsResetter resetter = new StreamsResetter();
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER};
resetter.run(args);
類是我進口卡夫卡核心庫的一部分在使用maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
這是我已經知道了,見上面......這裏的關鍵詞是編程。 – micgn
那麼重置工具是一個程序,它是開源的,所以答案是肯定的,你可以以編程方式重置消費者偏移量和流狀態存儲。 –