3

我一直在與卡夫卡合作,對消費者羣體下的消費者有點混淆。混淆的核心是是否將消費者實施爲流程或線程。對於這個問題,假設我正在使用高級消費者。卡夫卡消費者 - 消費者流程和主題分區與主題分區的關係如何?

讓我們考慮一個我已經嘗試過的場景。在我的話題中有2個分區(爲了簡單起見,我們假設複製因子只是1)。我用group1組創建了一個消費者(ConsumerConnector)進程consumer1,然後創建了大小爲2的主題計數映射,然後在該進程下生成了2個消費者線程consumer1_thread1consumer1_thread2。它看起來像consumer1_thread1正在消耗分區0consumer1_thread2正在消耗分區1。這種行爲總是確定性的嗎?以下是代碼片段。類TestConsumer是我的消費者線程類。

... 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(2)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

    executor = Executors.newFixedThreadPool(2); 

    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new TestConsumer(stream, threadNumber)); 
     threadNumber++; 
    } 
    ... 

現在,讓我們考慮另一種情況下(我還沒有嘗試,但很好奇),我開始2消費者處理consumer1consumer2都具有相同羣組group1和他們每個人是一個單線程過程。現在我的問題是:

  1. 在這種情況下,兩個獨立的消費者過程(在同一組下)如何與分區相關?它與上面的單進程多線程場景有什麼不同?

  2. 一般來說,消費者線程或進程如何映射/關聯到主題中的分區?

  3. 卡夫卡文檔確實說消費者組下的每個消費者都會消費一個分區。但是,它是指消費者線程(如我的上面的代碼示例)還是獨立的消費者進程?

  4. 有沒有什麼微妙的東西我在這裏丟失關於實現消費者作爲進程vs線程?提前致謝。

回答

7

消費者組可以運行多個消費者實例(多個進程使用相同的group-id)。在使用時,每個分區僅由組中的一個使用者實例消耗。

E.g.如果您的主題包含2個分區,並且您使用2個消費者實例啓動消費者組group-A,則其中每個消費者組都將使用該主題的特定分區中的消息。

如果您啓動具有不同組ID的相同2消費者group-A & group-B那麼來自兩個主題分區的消息將被廣播到它們中的每一個。所以在這種情況下,在group-A下運行的消費者實例將具有來自這兩個主題分區的消息,並且對於group-B也是如此。

瞭解更多關於這對他們documentation

編輯:基於您的評論它說,

我想知道是什麼具有在相同的2個消費者線程之間的有效差異過程而不是2個消費過程(在兩種情況下組相同)

消費者group-id在羣集中是相同/全局的。假設你已經啓動了具有2個線程的進程,然後產生另一個進程(可能在不同的機器上),而同一個groupId有2個線程,那麼kafka將添加這2個新線程以使用來自主題的消息。所以最終會有4個線程負責從同一主題中消費。然後Kafka將觸發重新分配以將分區重新分配給線程,所以可能發生的情況是線程T1 of process P1正在使用的特定分區可能被分配爲被線程T2 of process P2佔用。以下幾行摘自維客頁面

當一個新進程啓動時使用相同的使用者組名稱時,Kafka會將進程的線程添加到可用於使用該主題並觸發'再平衡」。在此重新平衡過程中,Kafka會將可用分區分配給可用線程,可能會將分區移至另一個進程。如果您混合使用新舊業務邏輯,則可能會有一些消息轉到舊邏輯。

+0

我明白你的意思。但是,我最初的問題主要關注的是線程與進程。我想知道在同一個進程下有兩個消費者線程,而不是兩個消費者進程(在這兩種情況下,組是相同的)有什麼區別? –

+1

@AsifIqbal更新了我的回答 – user2720864

+0

感謝您的編輯。這真的回答了我的好奇心。與此同時,我對多個流程做了一些進一步的實驗,並看到重新平衡正在發生:)。 –

1

選擇具有相同標識的多個消費者組實例與單個消費者組實例的主要設計決策是彈性。例如,如果您的單個消費者擁有兩個線程,那麼如果此機器停機,您將失去所有消費者。如果您有兩個具有相同標識的獨立消費者組,每個不同主機上的消費者組都可以承受失敗。理想情況下,每個用戶組在上面應該有兩個線程,因此如果一個主機出現故障,另一個用戶組使用其休眠線程佔用另一個分區。事實上,總是希望擁有比分區多的線程來覆蓋這個因素。

  1. 您可以在不同的主機上運行每個使用者組。對於一個給定的名稱/ ID,只有一個用戶組可以在單個主機上運行,​​因爲它在單個運行時環境中管理其所有線程。
  2. Kafka有一個算法來確定哪些線程/使用者組讀取各個主題分區。卡夫卡試圖以彈性的方式平均分配這些內容。當使用者組發生故障時,它使其他組中的其他線程能夠讀取給定的分區。
  3. 指用戶組中的單個線程。如果線程數多於分區數,那麼其中一些線程將保持休眠狀態,直到其他線程無法提供彈性。
  4. 首選項與韌性有關。因此,對於多個使用相同ID的消費者組設置,我可以在多臺主機上運行,​​使我的應用程序可以容忍失敗。
+0

答案中有很多不正確的東西。首先,您可以使用單個使用者組運行多個使用者主機。偏移在組級別上進行跟蹤,因此使用組來避免組內的線程跨越彼此的腳趾。即如果你設置了2個主機和2個不同的組來獲取相同的主題,那麼最終消息數量將增加一倍。 –

+0

第二件事是,卡夫卡沒有確定線程何時失敗的算法,您需要手動重新生成失敗線程,否則即使對於基於組的高級用戶,該分區也不會被使用。我用4個分區和8個線程測試了這個確切的事情,並且屬於失敗線程的分區只是保持未消耗(即它的偏移量不會移動,直到線程重新生成爲止)。 –

+0

所以如果你有4個分區併產生8個消費者線程,4個線程將不會做任何事情。另一方面,如果你產生了2個線程,每個線程將從2個分區中消耗。 –

相關問題