我的代碼做更多的少這樣的設置:卡夫卡流中線程分配的策略是什麼?
// loop over the inTopicName(s) {
KStream<String, String> stringInput = kBuilder.stream(STRING_SERDE, STRING_SERDE, inTopicName);
stringInput.filter(streamFilter::passOrFilterMessages).map(processor_i).to(outTopicName);
// } end of loop
streams = new KafkaStreams(kBuilder, streamsConfig);
streams.cleanUp();
streams.start();
如果有例如num.stream.threads> 1,如何將任務分配給準備和分配的(在循環中)線程?
我想(我不確定)有線程池和一些循環策略的任務分配給線程,但它可以完全動態地在運行時完成,或者在開始時通過創建過濾/映射到結構。
特別是當一個主題正在執行計算密集型任務而其他時間沒有的情況下,我感到非常有趣。是否有可能應用程序會因爲所有線程將分配給耗時的處理器而餓死。
讓我們玩了一下與場景:num.stream.threads=2
,no. partitions=4
每個主題,no. topics=2
(huge_topic和slim_topic) 在我的問題的循環一次在應用程序啓動時完成的。如果在循環中我定義了2個主題,並且我從一個主題知道重量加權(huge_topic)的消息,而另一個主題則來自輕量級消息(slim_topic)。 是否有可能num.stream.threads的兩個線程只會忙於來自huge_topic的任務?來自slimm_topic的消息將不得不等待處理?
感謝您的回答。這是迭代。如果我有例如* num.stream.threads *參數,那麼是什麼每個主題有10個分區? –
您可以在一個應用程序中擁有多個處理線程,並且您可以擁有多個應用程序實例。最後你有權決定,而不是卡夫卡。在Kafka docs的線程建模部分中的更多細節(Confluent one):https://docs.confluent.io/current/streams/architecture.html?highlight=num%20stream%20threads#threading-model – Arek
我知道你的頁面曾參考:-)。我通過添加示例擴展了我的問題。 –