2017-03-03 73 views
0

那麼,我想在python中使用Kafka-python軟件包(1.3.2)從我的生產者到消費者擁有簡單的數據轉移。獲取OverflowError:使用kafka-python生產者 - 消費者時,超時值太大

監製:

from kafka import KafkaProducer 
producer = KafkaProducer(bootstrap_servers='localhost:9092') 
# produce asynchronously 
for _ in range(2): 
    producer.send('my-topic', b'message') 
    producer.flush() 
producer = KafkaProducer() 

消費者:

from kafka import KafkaConsumer 

consumer = KafkaConsumer('my-topic', 
        group_id='my-group', 
        bootstrap_servers=['localhost:9092'],fetch_min_bytes=1) 
for message in consumer: 
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
             message.offset, message.key, 
             message.value)) 

consumer = KafkaConsumer() 
consumer.subscribe(["my-topic"]) 

下面我收到我的消費:

my-topic:0:5056: key=None value=b'message' my-topic:0:5057: key=None value=b'message'

但在同一時間,我都在生產這個錯誤:

Error in atexit._run_exitfuncs: Traceback (most recent call last): File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 364, in wrapper _self.close() File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 420, in close self._sender.join(timeout) File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1060, in join self._wait_for_tstate_lock(timeout=max(timeout, 0)) File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1072, in _wait_for_tstate_lock elif lock.acquire(block, timeout): OverflowError: timeout value is too large

默認情況下,超時設置爲NONE,並在Kafka.py被設置爲999999999。我無法弄清楚在KafkaProducer中通過這個超時的參數 - 在我的生產者代碼中。

有沒有人遇到過這個問題?或者任何人都可以幫助我這個方向。提前致謝。

回答

0

我認爲你的問題可能源於你在32位窗口的事實。據我所知,在kafka-python代碼中沒有32位的明確支持。

從我可以告訴你的實現看起來很好。