2017-09-01 71 views
1

我需要定期將kafka使用者的輸出轉儲到excel文件中。我使用下面的代碼:將kafka(kafka-python)轉儲到txt文件

from kafka import KafkaConsumer 
from kafka import KafkaProducer 
import json,time 
from xlutils.copy import copy  
from xlrd import open_workbook 
import pandas 

consumer = KafkaConsumer(bootstrap_servers='localhost:9092') 
KafkaConsumer() 
consumer.subscribe("test") 

rowx=0 
colx=0 

for msg in consumer: 
     book_ro = open_workbook("twitter.xls") 
     book = copy(book_ro) # creates a writeable copy 
     sheet1 = book.get_sheet(0) # get a first sheet 
     sheet1.write(rowx,colx, msg[6]) 
     book.save("twitter.xls") 

現在,我的問題是代碼效率不高。對於我需要打開,寫入並保存excel文件的每條消息。有沒有辦法打開一次,寫入,然後關閉它(對於一批消息,而不是for循環)? tnx

+0

爲什麼關閉文件呢? –

回答

0

是的,打開,寫入,保存並關閉每封郵件效率低下,您可以批量處理。但仍然需要在消耗循環中進行。

msg_buffer = [] 
buffer_size = 100 
for msg in consumer: 
     msg_buffer.append(msg[6]) 
     if len(msg_buffer) >= buffer_size: 
      book_ro = open_workbook("twitter.xls") 
      book = copy(book_ro) # creates a writeable copy 
      for _msg in msg_buffer: 
       sheet1 = book.get_sheet(0) # get a first sheet 
       sheet1.write(rowx,colx, _msg) 
      book.save("twitter.xls") 
      msg_buffer = [] 

你可能認爲這將比nobatch快100倍。

更新評論:

是的,通常我們會留在這個死循環,它在內部使用了民意調查,以獲取新的消息,發送心跳和COMMIT偏移。如果你的目標是消耗這個主題的消息並保存消息,它應該是一個長時間運行的循環。

這是kafka-python設計,你應該像這樣使用消息或使用consumer.poll()。

至於爲什麼你可以使用for msg in consumer:,因爲消費者是一個迭代器對象,它的類實現__iter____next__,它潛在的使用提取器獲取記錄。更多實施細節你可以參考https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py

+0

感謝您的注意。我想知道我們是否永遠留在「消費循環中的味精」中?在這種情況下,你的代碼是OK的。或者是否有任何回調函數(當客戶端真的收到一條消息時,它會調用一個函數來做某事)?什麼是消費者對象的類型?這是一個列表還是什麼? – user2867237

+0

回答udpated。 – GuangshengZuo