處理程序的實現非常簡單。實際上,設置環境需要比執行處理程序更多的時間。
處理程序構造函數接受可選參數key
。如果提供了,則寫入的消息將被髮送到由此密鑰指定的單個分區。如果未提供,則消息將按循環方式在服務器之間分發。
我還沒有測試過它,但它很簡單,我沒有看到這裏可能會出現什麼問題。希望它會有用。
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer,KeyedProducer
import logging,sys
class KafkaLoggingHandler(logging.Handler):
def __init__(self, host, port, topic, key=None):
logging.Handler.__init__(self)
self.kafka_client = KafkaClient(host, port)
self.key = key
if key is None:
self.producer = SimpleProducer(self.kafka_client, topic)
else:
self.producer = KeyedProducer(self.kafka_client, topic)
def emit(self, record):
#drop kafka logging to avoid infinite recursion
if record.name == 'kafka':
return
try:
#use default formatting
msg = self.format(record)
#produce message
if self.key is None:
self.producer.send_messages(msg)
else:
self.producer.send(self.key, msg)
except:
import traceback
ei = sys.exc_info()
traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr)
del ei
def close(self):
self.producer.stop()
logging.Handler.close(self)
kh = KafkaLoggingHandler("localhost", 9092, "test_log")
#OR
#kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1")
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
logger.addHandler(kh)
logger.info("The %s boxing wizards jump %s", 5, "quickly")
logger.debug("The quick brown %s jumps over the lazy %s", "fox", "dog")
try:
import math
math.exp(1000)
except:
logger.exception("Problem with %s", "math.exp")
P.S.處理程序使用此Kafka客戶端:https://github.com/mumrah/kafka-python
爲什麼不自己實現處理程序?它只需要重寫一些方法。 – Wildfire
我並不十分熟悉伐木或卡夫卡的膽量 - 所以希望能有一個完整的解決方案,以適應我目前的衝刺。鑑於這兩個應用程序的未知數和這個應用程序的重要性,如果我自己寫這一切,我將不得不推出這個應用程序。 – KenFar