2017-08-24 48 views

回答

-1

只需使用send()方法。你不需要自己管理它。

send()是異步的。當被調用時,它將記錄添加到 的緩衝區中,等待記錄發送並立即返回。這允許生產者 將單個記錄分批以獲得效率。

你的任務僅僅是配置關於這兩道具:的batch_sizelinger_ms

生產者維護每個分區的未發送記錄的緩衝區。 這些緩衝區的大小由'batch_size'配置指定。 使這個更大可以導致更多的批處理,但需要更多的內存(因爲我們通常每個 活動分區都有這些緩衝區之一)。

兩個道具將通過以下方式進行:

一旦我們得到的一個分區記錄的batch_size價值將立即不管此設置的發送,但是如果我們比少爲這個分區積累的這麼多字節,我們將在指定的時間內「徘徊」,等待更多的記錄出現。

+0

這個答案適用於卡夫卡-p ython,而不是py指定的OP。 https://github.com/dpkp/kafka-python/blob/0c78f704520a42d0935cb87298dd69f8e4af5894/kafka/producer/kafka.py#L53 –

0

PyKafka透明地處理生產者中的消息批處理 - 你不必做任何特殊的事情來確保批量生成消息。 Producer類提供了一組配置選項,可讓您自定義批處理行爲。這些選項的完整列表是在documentation可用的,但也有一些最重要的問題是:

  • max_queued_messages - 當你produce() d比這更多的消息,立即發送
  • min_queued_messages批次 - 當你produce() d至少這麼多的郵件,發送批量
  • linger_ms - 當自最後一批這麼多的時間已經過去了,發送批量
相關問題