2015-11-03 66 views
1

我寫了一個非常簡單的Flink流式作業,它使用FlinkKafkaConsumer082從Kafka獲取數據。Flink + Kafka:爲什麼我會丟失信息?

protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) { 
    Properties result = new Properties(); 
    result.put("bootstrap.servers", getBrokerUrl()); 
    result.put("zookeeper.connect", getZookeeperUrl()); 
    result.put("group.id", getGroup()); 

     return env.addSource(
       new FlinkKafkaConsumer082<>(
         topic, 
         new SimpleStringSchema(), result); 
} 

這很有效,每當我在Kafka上放入主題時,它都會被我的Flink作業接收並處理。現在我試着看看如果我的Flink Job由於某種原因不在線會發生什麼情況。所以我關閉了flink工作,並不斷髮送消息給卡夫卡。然後我再次開始了我的Flink工作,並期待它會處理髮送的消息。

不過,我得到這個消息:

No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}] 

因此,它基本上忽略,因爲弗林克作業的最後關機是來了,剛開始在隊列的末尾閱讀所有郵件。從我收集的FlinkKafkaConsumer082文檔中可以看出,它會自動處理與卡夫卡經紀人同步處理的偏移量。但是,似乎並非如此。

我使用單節點Zookeper安裝(也與Kafka發行版捆綁在一起)的單節點Kafka安裝(附帶Kafka發行版的安裝)。

我懷疑它是某種錯誤配置或類似的東西,但我真的不知道從哪裏開始尋找。有沒有其他人有這個問題,也許解決了它?

回答

2

https://kafka.apache.org/08/configuration.html

集auto.offset.reset到最小(默認情況下它是最大的)

auto.offset.reset

在沒有初始失調怎麼辦在Zookeeper中或者如果 偏移量超出範圍:

最小:自動重置偏移量以最小的偏移

最大:自動復位偏移量最大偏移

別的:拋出異常給消費者。

如果設置爲最大,那麼消費者可能會在其訂閱的主題的分區數量在代理上發生變化時丟失一些消息。爲了防止 分區添加過程中數據丟失,設置auto.offset.reset到 最小

還要確保getGroup()是重啓

+0

謝謝你的建議,但是這會產生相反的問題,現在我得到消息兩次,因爲它從一開始就重新處理消息。我希望消費者能夠從停止的地方找到它,但由於某種奇怪的原因,消費者似乎並不記得它從哪裏離開。這個問題似乎是因爲任何原因沒有存儲在Zookeeper中的「初始偏移量」。也許我需要承認我處理了一條消息,但我沒有找到任何API。 –

+0

你是否檢查過getGroup()是否一樣? –

+0

是的,它是一樣的。它正在從配置文件中讀取。 –

4

後同我找到了原因。您需要在StreamExecutionEnvironment中明確啓用檢查點,以使Kafka連接器將處理後的偏移量寫入Zookeeper。如果不啓用它,Kafka連接器將不會寫入最後一次讀取偏移量,因此當收集Job重新啓動時,它將無法從那裏繼續。所以,一定要記:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
env.enableCheckpointing(); // <-- this is the important part 

阿納託利對改變初始偏移的建議很可能仍然是一個好主意,如果由於某種原因失敗的檢查點。

+2

我已經在Flink提交了一個JIRA,以便在禁用檢查點的情況下向ZK提交偏移量:https://issues.apache.org/jira/browse/FLINK-2974 –

+0

非常感謝:) –