2017-07-17 242 views
0

我使用卡夫卡消費者從幾個主題閱讀,我需要那些之一,具有更高的優先級。處理需要很多時間,並且總是有很多消息處於(低優先級)主題中,但我需要儘快處理來自其他消息的消息。卡夫卡消費者 - 主題(S)具有較高優先級

這類似的問題,因爲Does Kafka support priority for topic or message?但這一個是使用舊的API。

在新的API(0.10.1.1)的方法有

KafkaConsumer::pause(Collection) 
KafkaConsumer::resume(Collection) 

但它不是我清楚,如何有效地檢測出有高優先級的主題新的消息,並有必要暫停消費來自其他主題。

任何想法/例子嗎?

+1

您可以檢查是否爲您監視的分區endOffsets比那些分區最後提交的偏移較大。這是如何工作的將是特定實現,但會讓你知道在你投票之前是否有更多的消息需要消費 – dawsaw

回答

1

我終於解決了,因爲dawsaw建議 - 在處理循環中,我存儲所有主題/分區,我從讀:

  • beginningOffsets
  • endOffsets
  • 承諾 - 我不能使用位置,因爲我訂閱主題,而不是分區。

每當(endOffset - commited) > 0任何優先議題,我叫consumer.pause()非優先主題和(endOffset - commited) == 0所有優先議題後再次恢復的。

+0

你能分享你的策略來解決這個問題嗎?假設我們有(總共10 Gbs)低優先級消息和一些高優先級消息。我們有多個消費者和多個生產者。即使我們暫停了消費者,我們也需要暫停所有其他主題的製作者,以便讓您的想法實現。對?您有沒有這方面的經驗,因爲在100個服務和10個主題生態系統中,這似乎幾乎不可能? - 是的,我已閱讀你有關此事的其他問題。謝謝 – JSBach

+0

沒有 - 沒有必要暫停任何製作人 - 這個想法是,你有單個消費者訂閱了幾個主題(其中一些主題是高優先級和其他普通優先級)。在輪詢新消息之前,您需要檢查優先主題的滯後時間。如果任何這些滯後不爲零,這意味着,您需要暫停訂閱正常優先級的主題,而不是「消耗」消費者的時間。處理來自高優先級主題的所有消息後,可以再次恢復正常優先級的消息。 – miran

+0

謝謝。我不能完全違抗。但是它對於大型系統來說味道不好。一旦大壩門打開了大量的數據,我將不得不時刻檢查,如果我正在浪費這個低優先級隊列的資源。我爲什麼要?對。無論如何。再次感謝 – JSBach

0

我猜你可能在你的位置(),並提交()的搭配方法。 位置()方法的偏移的下一個記錄,將被獲取並承諾()方法獲取最後提交的對於給定的分割區的偏移(如文檔中所述)的。 在輪詢較低優先級之前,您可以檢查較高優先級的位置()和提交()。如果position()高於committed(),則可以在較高優先級()上暫停()較低優先級和poll(),然後恢復較低優先級。