2016-04-20 118 views
3

我有一個簡單的Spark應用程序,它從Kafka讀取數據,然後在http端點(或另一個kafka--對於這個問題讓我們考慮http)轉換後發送這些數據。我使用job-server提交作業。暫停Spark Streaming作業

我目前正在開始消耗源自「auto.offset.reset」=「最小」且間隔= 3s的kafka。在快樂的情況下,一切都很好。下面是摘錄:

kafkaInputDStream.foreachRDD(rdd => { 
    rdd.foreach(item => { 
    //This will throw exception if http endpoint isn't reachable 
     httpProcessor.process(item._1, item._2) 
    }) 
}) 

由於「auto.offset.reset」 =「最小」,這在處理一個任務大約200K的消息。如果我停止http服務器中間工作(模擬POSTing中的某個問題),並且httpProcessor.process拋出異常,那個Job失敗,任何未處理的內容都將丟失。我發現它每隔3秒就會持續輪詢。

所以我的問題是:

  1. 是我的假設正確,如果在未來3的第二職業,如果它得到了X的消息,只有ÿ可以打一個錯誤之前被處理,其餘的X-Y將不會被處理?
  2. 有沒有辦法暫停卡夫卡的流/消費?例如,如果存在間歇性網絡問題,並且很可能在此期間消耗的所有消息都將丟失。一些不斷重試(可能是指數退避)並且每當http結束點UP時,再次開始消費。

感謝

+1

如果在處理特定作業階段時遇到網絡故障,可以傳播(拋出)異常以使整個作業失敗,並重播失敗的整個批處理。這確實有一些開銷,只有在您的DAG是透明的時候纔有效。 –

回答

0

我覺得春天雲流可以解決你的問題。 卡夫卡是來源。 Spark Streaming是處理器。 Http是sink。 只有當卡夫卡有輸入時,纔會處理Spark Streaming。您無需停止或恢復來自Kafka的輸入。 希望它有幫助。

+0

我不明白 - 您不需要停止/恢復來自Kafka的輸入?如果Http Sink沒有響應呢?可能是網絡中斷或臨時網絡中斷? –

+0

源處理器和接收器像Linux PIPELINE一樣工作。沒有水槽就沒問題,處理器也能正常工作。你可以從[spring-cloud-stream](http://docs.spring.io/spring-cloud-stream/docs/1.0.0.RC2/reference/htmlsingle/index.html)和[spring-cloud] -data-flow](http://docs.spring.io/spring-cloud-dataflow/docs/1.0.0.M2/reference/html/) –

+0

如果Http Sink已關閉並且您不想丟失數據從Spark Streaming中,您可以在Spark Streaming和Http Sink之間添加Kafka或Redis。當Http Sink啓動時,它會從Kafka獲取數據。 –

2

是的,你的假設是正確的,如果你的分區失敗,其餘的事件不會被處理目前

但是,有很多參數可以調整以獲得所需的行爲(如果使用DirectKafkaInputDStream)。

讓我們從開始「auto.offset.reset」=「最小」:此參數告訴Kafka從一開始就開始,當沒有存儲當前組的提交時。正如你所提到的,你的RDD在啓動後包含很多消息,我假設你沒有正確地提交你的消息。如果你期望恰好 - 一次語義,你肯定應該考慮跟蹤你的偏移量,因爲DirectKafkaStreamInput顯然沒有跟蹤這個偏移量。

開始偏移事先指定,這DSTREAM不負責犯偏移,這樣就可以控制,僅一次

Comment in the DirectKafkaInputSream Branch 1.6

也就是說目前你的信息再加工,每次你重新開始流媒體作業。

如果在啓動時提交已處理的偏移量並將其傳遞到InputDStream中,那麼偵聽器將從上次提交的偏移量繼續。

關於背壓,DirectKafkaInputDStream已經使用RateController,它估計應在一批中處理多少事件。

要使用它,您必須啓用背壓:

"spark.streaming.backpressure.enabled": true 

您也可以限制「spark.streaming.kafka.maxRatePerPartition」添加上限的批量大小。

如果你想自己控制背壓(也許完全停止消費者一段時間),你可能需要實施一些方法StreamingListener並將它用在你的工作中。你可以例如在每個完成的批次之後決定停止流式作業,或者不使用StreamingListener。

+0

所有優點。您忘記提及CheckPointing了,我已經測試過,如果您希望重新啓動驅動程序進行升級等,我的工作非常出色。我更擔心間歇性故障,例如2分鐘〜10K次潛在故障消息的網絡故障。如果我跟蹤偏移Zookeeper/Cassandra的地方,當網絡恢復並且流再次恢復處理消息時,重播消息將是棘手的。需要多思考一下,謝謝你的回答。如果沒有更好的答案,我會獎勵你的賞金點數。 –

+1

@KP如果您已經被背壓部分隔離,並且您只想處理間歇性故障,那麼請不要失敗。由於每個分區都是按順序使用'Try recoverWithDelay recoverWithDelay ... fail'鏈爲每個元素處理的,因此應該足以在短時間內有效「暫停」流。 – zero323

相關問題