2017-10-13 54 views
0

我試圖通過Machine_1中的python腳本向Machine_2中的卡夫卡主題發送一些消息。 Machine_2Machine_1都在同一個網絡中,都是Azure中的虛擬機。Python-Kafka:程序以交互模式運行,而不是以腳本模式運行

代碼:sampl.py

from kafka import KafkaProducer 
Producer = KafkaProducer(bootstrap_servers=['Machine_2:9092']) 
Producer.send('test', 'hello') 

如果我運行上面的代碼作爲

蟒sampl.py

沒有消息到達的Machine_2。但是,如果我做的:

蟒蛇-i sampl.py

然後消息到達的Machine_2。我使用kafka-console-consumer.sh進行了相同的檢查。我做了yum updateMachine_1認爲可能有一些圖書館在這裏失蹤。但沒有運氣。

謝謝。

+0

您正在使用什麼卡夫卡Python和卡夫卡經紀人版本? –

+0

對於遲到的回覆我很抱歉。卡夫卡版本 - -0.10.2.0,pytho kafka模塊 - kafka_python-1.3.5。 – wonder

回答

0

這裏是kafka-python的維護者。 Producer.send('test', b'hello')是異步的,不提供即時交付。你可能看到的是,在生產者有機會完成網絡發送之前,python解釋器正在關閉。

如果您希望在完成腳本之前等待消息發送完畢,則應該使用.get(timeout = ...)。因此,嘗試:

Producer.send('test', b'hello').get(timeout=1000)

或者交替,你可以調用flush()做同樣的,所有未發送的消息:

Producer.flush(timeout=1000)

+0

生產者發送消息的默認週期間隔是多少?如果在超時期限內沒有發送,該怎麼辦? – wonder

+0

kafka-python不使用週期性間隔進行發送。數據可以在內部進行緩衝以創建更大的批次以獲得更高的吞吐量,但默認情況下會禁用該數據。有關更多詳細信息,請參閱文檔:http://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html – dpkp

相關問題