2016-02-23 217 views
0

我看到下面的代碼消費來自卡夫卡的消息。有20個分區有1個主題,使用ExecutorService創建20個線程。每個分區有20個消息流。運行此程序時,將讀取20條消息並從主題進行處理。當其中一個線程完成處理時,我假設下一條消息將被讀取。消費消費使用卡夫卡消費者 - Java

如果在100個消息位於主題中的示例場景中,將讀取所有消息並將其保存在內存中,並且每次都將由線程處理20個消息,或者僅在消息之後才從主題讀取消息目前正在處理的線程是否被處理?

public void run(int a_numThreads) { 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(a_numThreads)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

    // now launch all the threads 
    // 
    executor = Executors.newFixedThreadPool(20); 

    // now create an object to consume the messages 
    // 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ConsumerTest(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

編輯:我遇到了這個post的答案。但我有以下問題:

如果有20個分區的單個主題,我可以在2個不同的節點上運行消費者?我應該在每個消費者中提到消息流的數量爲10嗎?當我節點失敗或出現性能問題時,數據流會自動重新平衡到工作節點嗎?

回答

1

是的,您可以在不同的節點上運行多個消費者以使用同一主題。基於機器配置,消息流的數量可以不同。如果它的小機器可以給5個左右。

如果一個節點發生故障,它會自動轉移到加載到其他節點。除了失敗之外,還有其他一些屬性,如topic.metadata.refresh.interval.ms,它們決定何時重新平衡加載。

+0

謝謝帕雷希。如果有20個分區,我可以在兩個節點上分別設置20個分區嗎?如果一個失敗,我想所有的20個分區將由一個消費者中的20個線程處理。 –

+0

分區是在創建主題時指定的。如果您的配置中有1個代理,則所有20個分區都在一個代理中。如果你有兩個經紀人,分配分配。您不需要指定消費級別的分區數量。所以要回答你的這個問題「如果一個人失敗了,我想所有的20個分區將由一個消費者中的20個線程處理」,如果一個失敗,所有分區移動到一個節點並由單個消費者處理。 – Paresh

+0

對不起,我不清楚我的問題。假設我的集羣和20個分區中有2個節點。我假設20個分區將分佈在2個節點上,流量將自行重新平衡。我有一個使用20個線程創建20個消息流並處理數據的消費者項目。如果我在2個節點中部署相同的應用程序,10個分區將由1個實例消費者應用程序處理,其他10個將由另一個實例處理。 –