我寫了一個非常簡單的Flink流式作業,它使用FlinkKafkaConsumer082
從Kafka獲取數據。Flink + Kafka:爲什麼我會丟失信息?
protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
Properties result = new Properties();
result.put("bootstrap.servers", getBrokerUrl());
result.put("zookeeper.connect", getZookeeperUrl());
result.put("group.id", getGroup());
return env.addSource(
new FlinkKafkaConsumer082<>(
topic,
new SimpleStringSchema(), result);
}
這很有效,每當我在Kafka上放入主題時,它都會被我的Flink作業接收並處理。現在我試着看看如果我的Flink Job由於某種原因不在線會發生什麼情況。所以我關閉了flink工作,並不斷髮送消息給卡夫卡。然後我再次開始了我的Flink工作,並期待它會處理髮送的消息。
不過,我得到這個消息:
No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]
因此,它基本上忽略,因爲弗林克作業的最後關機是來了,剛開始在隊列的末尾閱讀所有郵件。從我收集的FlinkKafkaConsumer082
文檔中可以看出,它會自動處理與卡夫卡經紀人同步處理的偏移量。但是,似乎並非如此。
我使用單節點Zookeper安裝(也與Kafka發行版捆綁在一起)的單節點Kafka安裝(附帶Kafka發行版的安裝)。
我懷疑它是某種錯誤配置或類似的東西,但我真的不知道從哪裏開始尋找。有沒有其他人有這個問題,也許解決了它?
謝謝你的建議,但是這會產生相反的問題,現在我得到消息兩次,因爲它從一開始就重新處理消息。我希望消費者能夠從停止的地方找到它,但由於某種奇怪的原因,消費者似乎並不記得它從哪裏離開。這個問題似乎是因爲任何原因沒有存儲在Zookeeper中的「初始偏移量」。也許我需要承認我處理了一條消息,但我沒有找到任何API。 –
你是否檢查過getGroup()是否一樣? –
是的,它是一樣的。它正在從配置文件中讀取。 –