這裏是我們的物聯網平臺的入站郵件傳遞流程:卡夫卡生產者配額
Device ---(MQTT)---> RabbitMQ Broker ---(AMQP)---> Apache Storm ---> Kafka
我期待實現解決方案,有效地限制/節流數據以每個發佈到卡夫卡每秒量客戶基礎。
當前的策略利用Guava的RateLimiter,每個設備都有自己的本地緩存實例。當接收到設備消息時,映射到該deviceId的RateLimiter從緩存中獲取並調用tryAquire()
方法。如果許可證被成功獲得,那麼該元組像往常一樣被轉發到卡夫卡,否則超出配額,並且消息被無聲地丟棄。這種方法相當麻煩,並且在某種程度上註定要失敗或成爲瓶頸。
我一直在閱讀卡夫卡的字節率配額,並相信這可以在我們的案例中完美工作,尤其是因爲可以動態配置卡夫卡客戶端。當在我們的平臺上創建虛擬設備時,應該添加一個新的client.id,其中client.id == deviceId
。
讓我們假定以下使用情況爲例:
- 管理員創建2個虛擬設備:溼度&溫度傳感器
- 規則被激發到對於上述裝置 創建卡夫卡新用戶/的clientId條目
- 通過卡夫卡CLI
- 設置它們的生產配額值兩款器件發出的入站事件消息
- ...?
這是我的問題。如果使用單個Producer實例,是否可以在調用send()
之前在ProducerRecord中或Producer的某處指定client.id
?如果一個生產者只允許一個client.id
,這是否意味着每個設備都必須有自己的生產者?如果只允許一對一的映射,那麼緩存數百甚至數千個Producer實例是否明智呢,每個設備一個呢?有沒有更好的方法我還沒有意識到?
注意:我們的平臺是一個「開門系統」,意味着客戶永遠不會收到錯誤響應,例如「超出率」或任何錯誤。這對最終用戶來說都是透明的。出於這個原因,我不能干涉RabbitMQ中的數據或將消息重新路由到不同的隊列。我唯一的選擇是整合這些東西,位於Storm或Kafka之間。
_你能不能細說,請與expla「海邊的卡夫卡消息頭可以被用來辨別哪些設備實際產生的數據。」在如何實現它?即使我是以每個用戶爲基礎做的,但我仍然需要弄清楚如何告訴Kafka,消息X源自客戶端1,消息Y源自客戶端2,等等......所有這些都通過* *單**,共享生產者實例。 – user2208562