2017-03-23 56 views
0

我有消耗2個主題典型samza任務:從configdataconfig,並將郵件作爲rocksdb本地狀態,以檢查是否從data消息確定。一個samza任務如何能夠佔用超過一個卡夫卡劃分流

如果這兩個主題中的每一個都只有一個分區,則此任務可以正常工作。一旦我將data分爲十個分區,config仍然是一個分區,事情就會改變。默認情況下,samza產生10個任務消耗分區0〜9 data的話題,唯一的任務0的消耗config話題:

task[0] -> config, data[0] task[1] -> data[1] ... task[9] -> data[9]

似乎每個任務都與自己的rocksdb實例初始化,所以唯一的任務[0 ]將所有配置數據存儲在其rocksdb實例中,任務[1〜9]沒有配置數據,因此無法找到傳入數據的配置信息。

我的預期是什麼每項任務從數據分區,這樣的配置流消耗的消息:

task[0] -> config, data[0] task[1] -> config, data[1] ... task[9] -> config, data[9]

有什麼辦法來實現這一目標?

回答

4

輸入流分區的分佈由一個可插入的石斑魚控制,該石斑魚使用「job.systemstreampartition.grouper.factor」配置。默認情況下,此類將傳入流分區跨任務實例分組。默認情況下,我相信它會執行GroupByPartitionId。這就是爲什麼你在任務[0]中看到數據[0]和配置[0]的原因。

您可以實現自定義SSPGrouper。但是,您要查找的是將「數據」流視爲常規輸入流,將「配置」流視爲「廣播」輸入流。廣播意味着Samza作業中的每個任務都從該流的分區中讀取。這樣,每個任務實例都可以使用配置流的數據填充其本地rocksdb。您可以配置廣播流爲: task.broadcast.inputs=<systemName>.<streamName>#[<partition-range>], <systemName>.<streamName>#[<partition-range>]

對於你的情況,你可以配置: task.inputs = <systemName>.data task.broadcast.inputs = <systemName>.config#0

退房Broadcast Streams in Samza

+0

非常感謝,@NavinaRamesh。這個對我有用。 – Aries

+0

根據@NavinaRamesh的建議,我遇到了一個異常:線程中的異常「main」java.lang.IllegalArgumentException:kafka.config#[0]中的格式不正確。廣播流名稱應該採用'system.stream#partitionId'或'system.stream#[partitionN-partitionM]'的格式,這意味着正確的格式應該是'task.broadcast.inputs = .config#0',在我改成正確的格式之後,所有的東西都像魅力一樣。感謝和歡呼,@NavinaRamesh :) – Aries

+0

很高興你能弄清楚:)我已經修復了我的迴應中的配置格式。謝謝! –