2017-10-09 133 views
1

我的代碼做更多的少這樣的設置:卡夫卡流中線程分配的策略是什麼?

// loop over the inTopicName(s) { 

KStream<String, String> stringInput = kBuilder.stream(STRING_SERDE, STRING_SERDE, inTopicName); 
stringInput.filter(streamFilter::passOrFilterMessages).map(processor_i).to(outTopicName); 

// } end of loop 

streams = new KafkaStreams(kBuilder, streamsConfig); 
streams.cleanUp(); 
streams.start(); 

如果有例如num.stream.threads> 1,如何將任務分配給準備和分配的(在循環中)線程?

我想(我不確定)有線程池和一些循環策略的任務分配給線程,但它可以完全動態地在運行時完成,或者在開始時通過創建過濾/映射到結構。

特別是當一個主題正在執行計算密集型任務而其他時間沒有的情況下,我感到非常有趣。是否有可能應用程序會因爲所有線程將分配給耗時的處理器而餓死。

讓我們玩了一下與場景:num.stream.threads=2no. partitions=4每個主題,no. topics=2(huge_topic和slim_topic) 在我的問題的循環一次在應用程序啓動時完成的。如果在循環中我定義了2個主題,並且我從一個主題知道重量加權(huge_topic)的消息,而另一個主題則來自輕量級消息(slim_topic)。 是否有可能num.stream.threads的兩個線程只會忙於來自huge_topic的任務?來自slimm_topic的消息將不得不等待處理?

回答

2

如果有例如, num.stream.threads> 1,如何將任務分配給 準備並分配(在循環中)的線程?

任務分配給使用分區石斑魚的線程。你可以閱讀關於它here。 AFAIK在重新平衡之後被調用,所以它不是一個非常動態的過程。這就是說,我認爲沒有飢餓的選擇。

+0

感謝您的回答。這是迭代。如果我有例如* num.stream.threads *參數,那麼是什麼每個主題有10個分區? –

+0

您可以在一個應用程序中擁有多個處理線程,並且您可以擁有多個應用程序實例。最後你有權決定,而不是卡夫卡。在Kafka docs的線程建模部分中的更多細節(Confluent one):https://docs.confluent.io/current/streams/architecture.html?highlight=num%20stream%20threads#threading-model – Arek

+0

我知道你的頁面曾參考:-)。我通過添加示例擴展了我的問題。 –

1

在內部,Kafka Streams基於分區創建任務。用你的循環例子,假設你有3個輸入題目A,B,C分別有2,4和3個分區。對於這一點,你會得到4任務(即,在所有主題分區的最大數量)與下列分區任務分配:

  • T0:A-0,B-0,C-0
  • T1 :A-1,B-1,C-1
  • T2:              B-2,C-2
  • T3:              B-3

分區按「編號」分組並分配給相應的任務。這是在運行時確定的(即,在您致電KafakStreams#start()之後),因爲在此之前,每個主題的分區數量未知。

如果您不瞭解卡夫卡流的所有內部細節,那麼不建議混亂分組分區 - 您可以非常輕鬆地分解東西!

關於線程:任務限制了線程的數量。對於我們的示例,這意味着您可以擁有最多4個線程(如果您擁有更多線程,那些線程將處於空閒狀態,因爲沒有任何任務留給線程分配)。你如何「分配」這些線程取決於你。您可以使用4個線程(或之間的任何內容)爲單個應用程序實例提供4個單線程應用程序實例。

如果您的任務比線程少,則會根據任務數量(假定所有任務具有相同的負載)以負載均衡方式分配任務。