2017-05-30 55 views
0

我正在調用一個從kafka生產者發送一些數據的函數,但是在它發送之後,我返回了一個不返回的響應。代碼在返回時卡住了。任何人有任何想法發生什麼事?Kafka Producer停止我的代碼

我的代碼如下,

def postEvent(eventData): 
    print("The eventData is...",eventData) 
    timestamp = datetime.now().__format__("%Y-%m-%d %H:%M:%S") 
    try: 
     producer = KafkaProducer(bootstrap_servers=["host:port"]) 
     data = json.dumps(eventData).encode('utf-8') 
     try: 

      kafkaResponse = producer.send('streamTest', data) 

      response ={'time': str(timestamp), 'kafkaResponse':kafkaResponse.get(), 
         'postResult': 'true'} 
      print('kafaka response is...', response) 
     except ConnectionAbortedError: 
       response ={'time': str(timestamp), 'postResult': 'false'} 
     except kafka.errors.KafkaTimeoutError: 
       response ={'time': str(timestamp), 'postResult': 'false'} 
     print('kafaka response is...', response) 
     return response 
    except kafka.errors.NoBrokersAvailable: 
     response = {'Response':'Kafka Errors... NoBrokersAvailable'} 
     print('kafaka response ', response) 
     return response 

回答

0

這是從你的問題,其return聲明它掛在不清楚。

我測試了你的代碼,它對於我來說完全適用於Kafka 0.10.0.1代理和kafka-python 1.3.5。

這可能是一個卡夫卡集羣網絡的問題,所以這兩個地方,你很可能會掛要麼是: 1. kafkaResponse.get(),而你等待Future解決 2.無可用的經紀人,而經紀人超時。如果您傳入多個經紀人,請記住,他們需要在每次超時之前引發NoBrokersAvailable錯誤。