2014-01-13 37 views
2

我有一個大量使用Python日誌記錄模塊的複雜應用程序。如何從Python日誌記錄模塊寫入Kafka?

我需要開始將這些日誌記錄到Kafka集羣中,並且需要確保我不會一直改變數據。

對於我來說,理想的解決方案是爲卡夫卡創建一個新的處理程序 - 並允許日誌同時存在於舊的日誌記錄解決方案和卡夫卡。然後最終關閉舊的日誌處理程序併發送給Kafka。

但是,我沒有看到任何kafka日誌處理程序 - 只有kafka客戶端。添加一個kafka客戶端意味着要追蹤每個當前的日誌記錄調用,並向新的kafka客戶端添加一個單獨的調用。獲得相同的結果將是困難的。

+0

爲什麼不自己實現處理程序?它只需要重寫一些方法。 – Wildfire

+0

我並不十分熟悉伐木或卡夫卡的膽量 - 所以希望能有一個完整的解決方案,以適應我目前的衝刺。鑑於這兩個應用程序的未知數和這個應用程序的重要性,如果我自己寫這一切,我將不得不推出這個應用程序。 – KenFar

回答

12

處理程序的實現非常簡單。實際上,設置環境需要比執行處理程序更多的時間。

處理程序構造函數接受可選參數key。如果提供了,則寫入的消息將被髮送到由此密鑰指定的單個分區。如果未提供,則消息將按循環方式在服務器之間分發。

我還沒有測試過它,但它很簡單,我沒有看到這裏可能會出現什麼問題。希望它會有用。

from kafka.client import KafkaClient 
from kafka.producer import SimpleProducer,KeyedProducer 
import logging,sys 

class KafkaLoggingHandler(logging.Handler): 

    def __init__(self, host, port, topic, key=None): 
     logging.Handler.__init__(self) 
     self.kafka_client = KafkaClient(host, port) 
     self.key = key 
     if key is None: 
      self.producer = SimpleProducer(self.kafka_client, topic) 
     else: 
      self.producer = KeyedProducer(self.kafka_client, topic) 

    def emit(self, record): 
     #drop kafka logging to avoid infinite recursion 
     if record.name == 'kafka': 
      return 
     try: 
      #use default formatting 
      msg = self.format(record) 
      #produce message 
      if self.key is None: 
       self.producer.send_messages(msg) 
      else: 
       self.producer.send(self.key, msg) 
     except: 
      import traceback 
      ei = sys.exc_info() 
      traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr) 
      del ei 

    def close(self): 
     self.producer.stop() 
     logging.Handler.close(self) 

kh = KafkaLoggingHandler("localhost", 9092, "test_log") 
#OR 
#kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1") 

logger = logging.getLogger("") 
logger.setLevel(logging.DEBUG) 
logger.addHandler(kh) 
logger.info("The %s boxing wizards jump %s", 5, "quickly") 
logger.debug("The quick brown %s jumps over the lazy %s", "fox", "dog") 
try: 
    import math 
    math.exp(1000) 
except: 
    logger.exception("Problem with %s", "math.exp") 

P.S.處理程序使用此Kafka客戶端:https://github.com/mumrah/kafka-python

相關問題