0
我是Twisted的新手,這是我的第一個程序。扭曲的python從kafka讀取並寫入elasticsearch
我無法從kafka-python庫中找到使用KafkaConsumer的方法,並使用treq來觸發對elasticsearch的發佈請求。
我能在小塊分解問題: 創建卡夫卡消費迭代器,並從中讀取數據(的話題可能是巨大的)
def consumeKafka():
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
consumer.subscribe(['kafkapipeline'])
for v in consumer:
v.value
後使用TREQ
def post(self):
d = treq.post('http://es:9200/pro/pr/', self.data)
d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x))
到elasticsearch啓動反應堆
from twisted.internet import reactor
reactor.callWhenRunning(consumeKafka)
reactor.run()
任何想法如何做這個工作?
感謝您的回答,我仍然試圖讓它工作,但我想我需要先閱讀扭曲的文檔。這並不容易。 – rolele