2016-02-05 24 views
2

該項目是爲real time search engine - log analysis表現。瓶 - 拉直播kafka數據 - 集成卡夫卡與Python瓶

我有一個從Spark處理到卡夫卡的實時流數據。

現在與卡夫卡輸出, 我想get the data from the Kafka using Flask ..和visualize it using Chartjs或其他一些可視化..

我如何從Kafka using the python flask實時數據流?

任何想法如何開始?

任何幫助將不勝感激!

謝謝!

+0

你試過了什麼?社區無法解決這個問題,你的問題太過於委員會。 –

回答

0

我正在研究一個類似的問題(帶有Kafka流出的流式數據的小Flask應用程序)。

你必須做幾件事來設置它。首先,你需要一個KafkaConsumer搶消息:

from kafka import KafkaConsumer 
consumer = KafkaConsumer(group_id='groupid', boostrap_servers=kafkakserver) 
consumer.subscribe(topics=['topicid']) 

try: 
    # this method should auto-commit offsets as you consume them. 
    # If it doesn't, turn on logging.DEBUG to see why it gets turned off. 
    # Not assigning a group_id can be one cause 
    for msg in consumer: 
     # TODO: process the kafka messages. 
finally: 
    # Always close your producers/consumers when you're done 
    consumer.close() 

這大約是最基本的KafkaConsumer。 for循環會阻塞線程並循環,直到提交最後一條消息。還有consumer.poll()方法可以在給定的時間內抓取您可以發送的消息,具體取決於您希望如何構建數據流。卡夫卡的設計考慮了長期運行的消費者流程,但如果您正確提交消息,則可以根據需要打開和關閉消費者。

現在你有了數據,所以你可以使用Flask將它流式傳輸到瀏覽器。我對ChartJS並不熟悉,但live streaming from Flask的核心是調用一個循環內的yield結尾的python函數,而不是在處理結束時使用return

查看Michael Grinberg's bloghis followup作爲使用Flask進行流式傳輸的實際示例。 (注意:任何實際上在嚴肅的Web應用程序中流式傳輸視頻的人都可能希望將其編碼爲像使用ffmpy的廣泛使用的H.264之類的視頻編解碼器並將其包裝在MPEG-DASH中......或者可以選擇一個框架,這東西給你。)