0

我是新來的整個卡夫卡/火花的東西。我有Spark Streaming(PySpark)從卡夫卡製作人處獲取數據。它運行良好一分鐘,然後總是拋出一個kafka.common.OffsetOutOfRangeException。卡夫卡消費者版本是0.8(顯然,PySpark不支持0.10)。我在AWS Ubuntu 14.04上有一個擁有3名工作人員的主節點。我不知道這是否相關,但這裏的卡夫卡日誌相對較大(〜1-10kb),因此我調整了生產者/經紀人/消費者配置。數據正在通過罰款,雖然可能比我認爲生產者可能產生的速度慢(這可能是問題的根源?)。卡夫卡+火花流:kafka.common.OffsetOutOfRangeException

類似的問題是由這裏增加的滯留時間/尺寸解決:Kafka OffsetOutOfRangeException

但我的保留時間是一個小時,大小爲1GB的每個節點的server.properties,更重要的是,有一個在星火的時間沒有變化失敗和設置的保留時間/大小。

是否有任何其他可能的調整,也許在Spark Streaming配置?我在網上看到的所有答案都與Kafka配置有關,但對我的情況來說似乎沒有什麼不同。

編輯1:我試過a)有多個流從生產者讀取和b)減慢生產者流本身與time.sleep(1.0)。沒有持久的效果。

n_secs = 1 
ssc = StreamingContext(sc, n_secs) 
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], { 
        'bootstrap.servers':'localhost:9092', 
        'group.id':'test-video-group', 
        'fetch.message.max.bytes':'15728640', 
        'auto.offset.reset':'largest'}) for _ in range(n_streams)] 

stream = ssc.union(*kds) 
+0

看起來你正在使用0.8的新消費者嗎?我猜測這是由引導服務器而不是zk連接。你如何做抵消? – dawsaw

+0

@dawsaw它自動提交,但在週末我認爲我已經確定這是Spark Streaming中的背壓問題。 – thefourtheye

回答

0

有沒有可能是你的生產產生了太多的信息太快,這樣1G是不夠的每個經紀人? 1G在所有現實中似乎都很低。在Spark Streaming決定了需要在微批處理中處理的偏移量範圍並嘗試根據偏移量從代理中檢索消息後,消息由於大小限制而消失。請增加經紀人的大小,使其像100G一樣大,看看是否能解決你的問題。

+0

我同意(經過一些進一步的測試),它幾乎可以肯定是Spark Streaming無法跟上的生成器生成的消息。但我想我需要改變我正在製作的和/或Spark配置,而不是更改Kafka配置。 – thefourtheye

相關問題