2017-08-16 160 views
1

問:我怎樣才能查詢碼弗林克內部的特定消費羣的抵消了卡夫卡的話題? (和側面的問題(如果需要的話,會在這裏提出一個新問題)如果可能的話,我可以得到該偏移量的時間戳嗎?Flink - 查詢卡夫卡主題用於消費羣體的抵消?

(我發現有cli工具來查詢它,但是這不是我想要的,因爲它不是編程方式弗林克工作中完成的)

上滿問題的一些額外的背景,但我不想讓這個過於開放式的。

我有其中數據將被從kafkaTopic1流入的程序的使用情況下(讓我們稱之爲P1),處理,然後保存到數據庫中。P1將是一個多節點集羣上,以便每個節點將處理許多卡夫卡分區(允許說有該主題的5個節點和50個卡夫卡分區)。如果其中一個節點由於某種原因完全失敗並且正在處理數據,那麼該數據將會丟失。

例如,如果kafkaTopic1上有500條消息並且node2已經拉動了10條消息(因此根據偏移量拉取的下一條消息是消息11),但只有8條消息已經完全處理並保留到數據庫節點失敗,仍然正在處理的2將會丟失。而當節點恢復起來將開始從消息11讀取,跳過兩個丟失的消息(上和技術上卡夫卡分區將開始發送其消息到另一個節點進行處理,以便在該分區的偏移會移動,我們不當節點死亡時,必然確切地知道下一個要處理的消息)。

(注:當節點死亡,假設用戶通知和斷開P1完全所以沒有更多的數據將在這個點進行處理,暫時)。

因此,這是弗林克用武之地。我想做一個flink作業,可以通過用戶的參數告訴P1的使用者組,然後查詢kafka主題(也由用戶提供)以獲取當前偏移量(OS1)。然後,flink作業將設置其偏移量爲kafkaTopic1爲OS1之前的X個時間量(X由用戶通過參數提供)並開始讀取來自kafka主題的消息。然後,它會將它讀取的每條消息與數據庫中的內容進行比較,如果它未在數據庫中找到該消息,則會將其發送到另一個kafka主題(kafkaTopic2),以在重新啓動時由P1處理。

回答

1

如果檢查點是在弗林克作業啓用,那麼你不應該失去消息,因爲弗林克保持偏移內部以及從故障恢復後,就應該從偏移弗林克最後提交的讀取。

現在,如果您仍然希望找到偏移量並重新從偏移量中讀取數據,這會變得棘手,因爲您需要爲給定使用者組找到給定主題的所有分區的偏移量。

我不知道如何從Flink-kafka-Consumer API開箱即可完成此任務,但是您可以將kafka依賴項添加到您的項目中,並從Kafka API創建一個kafkaconsumer。一旦你的消費者,你可以撥打

consumer.position(partition) 

consumer.committed(partition) 

記住,你仍然需要遍歷所有分區讓所有的電流失調

閱讀對這裏的區別:Kafka Javadoc

一旦你有你想從中讀取數據的偏移量,你可以使用類似下面的手動在弗林克作業指定消費者偏移:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); 
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); 
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L); 
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); 

myConsumer.setStartFromSpecificOffsets(specificStartOffsets); 

爲弗林克 - 卡夫卡消費更多信息,檢查了這一點Flink Kafka Connector

+0

謝謝爲什麼我需要做的是因爲那將是失敗的程序(P1)是不是一個弗林克,我們需要的弗林克程序我的工作基本上告訴P1的原因:「嘿,你的偏移量是一個50級的時候你只將消息1到25的結果保存到數據庫中。重新處理的消息26到49" 。謝謝你,我會向非弗林克卡夫卡消費者去一個讓你知道事情是如何工作的。 – Jicaar

+0

如果多數民衆贊成的情況下,也許你應該手動提交的偏移量,只有當數據持續到數據庫並且不使用enable.auto.commit。所以基本上,禁用「enable.auto.commit」標誌和手動啓動承諾。這樣,當堅持到外部數據庫的過程中出現故障,它不會被提交到卡夫卡。 –

+0

我建議爲好,但它聽起來就像是太複雜了(是短型)。其中,數據可以保存到數據庫聽起來像的最大障礙多個可能的「出口點」。如果記錄1來通過和花費的時間比紀錄2進行處理,其他值得關注的是記錄2的承諾偏移會被重寫記錄1的偏移這將表明創紀錄的2還沒有被處理。如果是有道理的 – Jicaar