2017-08-28 23 views
2

我想讀卡夫卡的消息,所以我寫了簡單的消費者閱讀來自卡夫卡的消息。如何閱讀匯合kafka python中的批處理消息?

While True: 
     message = consumer.poll(timeout=1.0) 
     # i am doing something with messages 

在上面的代碼中消息類型的輸出是消息對象。我如何獲得一系列消息?

是否有可能?

注:只有不多的消費者配置的基本。

回答

5

librdkafka(底層的C庫)僅返回消息逐個應用程序,但在內部,消息由批次從經紀人取出,所以沒有性能缺點。消息在內部緩衝區中排隊,等待您的應用程序進行輪詢。

有結構調整的行爲:

fetch.wait.max.ms(默認爲100),積累的數據提供給經紀人的時候送 fetch.message.max.bytes(默認1048576,1GB),批次 queued.max.messages.kbytes的最大大小(默認1000000),內部隊列中數據的最大大小。如果您不定期輪詢,數據將不會從隊列中清除,您將無法獲取更多數據。

和很多人一樣,你可以在這裏找到:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


如果你真的想要的,因爲你的方式來處理數據的數據數組,你所能做的就是呼籲調查在如環低超時你做,並且當你有x消息或者y ms時停止你的循環,把它們累積在一個集合中。處理生成的數組並重復循環。

這同樣適用於生產:你產生一個數據之一,但消息發送到經紀人之前批處理。

+0

我們可以通過修改underlaying的C代碼返回一批消息?因爲在Python中迭代並只是獲取消息可能會減慢整個過程,從C本身返回一堆消息的速度會更快。 –

+1

之前就是這樣的情況,但它是基準的,由於完成分配的方式,沒有下降(在C中)一次返回一個消息而返回一個批次。在GitHub上我不知道蟒蛇很好,但也許有一個問題(或者你可以這樣討論),這將是比這個堆棧溢出多個鑲嵌 - 你可以(0)在循環使用poll創建批處理 – Treziac

+0

僅供參考(我認爲是你的):https://github.com/confluentinc/confluent-kafka-python/issues/252 – Treziac