如何使用消息批處理或使用pykafka緩衝區生成kafka主題。我的意思是一個生產者可以在一個生產過程中產生很多信息我知道使用消息批處理或緩衝區消息的概念,但我不知道如何實現它。我希望有人可以幫助我在這裏如何使用消息批處理或使用pykafka緩衝區生成kafka主題
1
A
回答
-1
只需使用send()
方法。你不需要自己管理它。
send()是異步的。當被調用時,它將記錄添加到 的緩衝區中,等待記錄發送並立即返回。這允許生產者 將單個記錄分批以獲得效率。
你的任務僅僅是配置關於這兩道具:的batch_size和linger_ms。
生產者維護每個分區的未發送記錄的緩衝區。 這些緩衝區的大小由'batch_size'配置指定。 使這個更大可以導致更多的批處理,但需要更多的內存(因爲我們通常每個 活動分區都有這些緩衝區之一)。
兩個道具將通過以下方式進行:
一旦我們得到的一個分區記錄的batch_size價值將立即不管此設置的發送,但是如果我們比少爲這個分區積累的這麼多字節,我們將在指定的時間內「徘徊」,等待更多的記錄出現。
0
PyKafka透明地處理生產者中的消息批處理 - 你不必做任何特殊的事情來確保批量生成消息。 Producer
類提供了一組配置選項,可讓您自定義批處理行爲。這些選項的完整列表是在documentation可用的,但也有一些最重要的問題是:
max_queued_messages
- 當你produce()
d比這更多的消息,立即發送min_queued_messages
批次 - 當你produce()
d至少這麼多的郵件,發送批量linger_ms
- 當自最後一批這麼多的時間已經過去了,發送批量
相關問題
- 1. Apache Kafka主題分區消息處理
- 2. 使用pykafka在kafka主題創建上創建多個分區
- 3. 用於kafka主題後處理的spark-streaming批處理間隔
- 4. 如何使用Confluent Kafka處理消費者端的消息處理失敗?
- 5. 如何使kafka-python或pykafka成爲uwsgi和gevent的異步生產者?
- 6. 使用NServiceBus如何處理多個消息作爲批處理?
- 7. Google協議緩衝區,如何處理多個消息類型?
- 8. 如何使用Java中的Kafka 8.2 API生成消息?
- 9. 使用RabbitMQ管理器排隊協議緩衝區消息
- 10. 使用Logstash過濾器處理來自Kafka主題的JSON消息
- 11. AudioRecord:如何使用通用緩衝區來處理和存儲?
- 12. 如何使用Gson從json構建協議緩衝區消息?
- 13. 如何使用kafka生產者將消息發送到同一主題?
- 14. 如何閱讀匯合kafka python中的批處理消息?
- 15. 消息傳遞代理和緩衝區
- 16. 如何僅使用NServiceBus主機處理特定消息主機
- 17. 使下緩衝的emacs跳過*消息*緩衝區
- 18. Emacs禁用*消息*緩衝區
- 19. 如何修改一個kafka主題的消息並使用java發送給另一個kafka主題?
- 20. 處理非常大的String消息的協議緩衝區?
- 21. 如何在DOS批處理中清空鍵盤緩衝區?
- 22. Kafka生產者消息在EC2主題中不可用linux
- 23. Python - 網絡緩衝區處理問題
- 24. 如何使用JNA處理WM_QUERYENDSESSION消息
- 25. 緩衝區漏洞問題 - 生成Shell
- 26. 春季集成kafka如何處理生成kafka時的錯誤
- 27. Spark Streaming + Kafka:如何從kafka消息檢查主題名稱
- 28. 如何處理聲卡緩衝區
- 29. 如何使用批處理
- 30. 如何使用批處理
這個答案適用於卡夫卡-p ython,而不是py指定的OP。 https://github.com/dpkp/kafka-python/blob/0c78f704520a42d0935cb87298dd69f8e4af5894/kafka/producer/kafka.py#L53 –