2016-10-02 25 views
0

我是Twisted的新手,這是我的第一個程序。扭曲的python從kafka讀取並寫入elasticsearch

我無法從kafka-python庫中找到使用KafkaConsumer的方法,並使用treq來觸發對elasticsearch的發佈請求。

我能在小塊分解問題: 創建卡夫卡消費迭代器,並從中讀取數據(的話題可能是巨大的)

def consumeKafka(): 
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest') 
    consumer.subscribe(['kafkapipeline']) 
    for v in consumer: 
     v.value 

後使用TREQ

def post(self): 
    d = treq.post('http://es:9200/pro/pr/', self.data) 
    d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x)) 

到elasticsearch啓動反應堆

from twisted.internet import reactor 
reactor.callWhenRunning(consumeKafka) 
reactor.run() 

任何想法如何做這個工作?

回答

0

我根本不使用卡夫卡,所以我不確定這是否適合您。另外,我假設你在同時運行Kafka和treq時遇到了麻煩。我在Twisted中處理迭代器的一種通用方法是使用inlineCallbacks來等待結果,然後用結果做一些事情。

from twisted.internet import defer 

@defer.inlineCallbacks 
def consumeKafka(): 
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest') 
    consumer.subscribe(['kafkapipeline']) 
    for v in consumer: 
     value = yield v.value 
     # do stuff with value 

然後,你可以簡單地調用這個函數,反應堆將負責其餘部分。所以,你的主要部分看起來就像這樣:

consumeKafka() 
reactor.run() 

注意,consumeKafka()函數返回一個Deferred,以便根據需要添加回調和errbacks。一旦您對此模型感到滿意,請查看Cooperator對象以獲取更多功能。

+1

感謝您的回答,我仍然試圖讓它工作,但我想我需要先閱讀扭曲的文檔。這並不容易。 – rolele