我正在研究一個類似的問題(帶有Kafka流出的流式數據的小Flask應用程序)。
你必須做幾件事來設置它。首先,你需要一個KafkaConsumer搶消息:
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id='groupid', boostrap_servers=kafkakserver)
consumer.subscribe(topics=['topicid'])
try:
# this method should auto-commit offsets as you consume them.
# If it doesn't, turn on logging.DEBUG to see why it gets turned off.
# Not assigning a group_id can be one cause
for msg in consumer:
# TODO: process the kafka messages.
finally:
# Always close your producers/consumers when you're done
consumer.close()
這大約是最基本的KafkaConsumer。 for
循環會阻塞線程並循環,直到提交最後一條消息。還有consumer.poll()
方法可以在給定的時間內抓取您可以發送的消息,具體取決於您希望如何構建數據流。卡夫卡的設計考慮了長期運行的消費者流程,但如果您正確提交消息,則可以根據需要打開和關閉消費者。
現在你有了數據,所以你可以使用Flask將它流式傳輸到瀏覽器。我對ChartJS並不熟悉,但live streaming from Flask的核心是調用一個循環內的yield
結尾的python函數,而不是在處理結束時使用return
。
查看Michael Grinberg's blog和his followup作爲使用Flask進行流式傳輸的實際示例。 (注意:任何實際上在嚴肅的Web應用程序中流式傳輸視頻的人都可能希望將其編碼爲像使用ffmpy的廣泛使用的H.264之類的視頻編解碼器並將其包裝在MPEG-DASH中......或者可以選擇一個框架,這東西給你。)
你試過了什麼?社區無法解決這個問題,你的問題太過於委員會。 –