2017-09-24 50 views
1

我想設置一個回調被解僱,如果生產記錄失敗。最初,我只想記錄失敗的記錄。如何爲kafka-python添加失敗回調kafka.KafkaProducer#send()?

匯合卡夫卡Python庫提供了一種機制用於添加回調:

produce(topic[, value][, key][, partition][, on_delivery][, timestamp]) 
... 
    on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery 

我怎樣才能實現與卡夫卡蟒蛇kafka.KafkaProducer#send()類似的行爲,而不必使用kafka.SimpleClient#send_produce_request()

回答

2

雖然它使用過時的SimpleClient沒有記錄,這是相對簡單的。無論何時您發送消息,您都會立即收到Future。您可以將回調/ errback可對於那個Future

F = producer.send(topic=topic, value=message, key=key) 
F.add_callback(callback, message=message, **kwargs_to_pass_to_callback_method) 
F.add_errback(erback, message=message, **kwargs_to_pass_to_errback_method) 

相關的源代碼在這裏: https://github.com/dpkp/kafka-python/blob/1937ce59b4706b44091bb536a9b810ae657c3225/kafka/future.py#L48-L64

我們真的應該記錄這一點,我申請https://github.com/dpkp/kafka-python/issues/1256跟蹤它。