1
處理卡夫卡的錯誤
我從卡夫卡主題閱讀郵件在斯卡拉通過以下方式:如何從斯卡拉
import org.apache.spark.streaming.kafka.KafkaUtils
val topicMessagesMap = topicMessages.split(",").map((_, kafkaNumThreads)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2)
我不知道什麼是處理可能的連接失敗的正確方法,尤其是考慮到我的Spark Streaming作業將在很長時間內運行,並且在此期間肯定會出現一些連接問題。 我希望Streaming作業在連接問題時不會停止,但它應該嘗試自動重新連接並讀取連接失敗之前未處理的所有消息。
我假設我應該正確地設置auto.offset.reset
, auto.commit.interval.ms
等,但是對於正確設置的詳細指導將非常感謝。
是的,但根據我的測試,當動物園管理員或卡夫卡是關閉的,那麼我的代碼停止。不過,我希望它等待並再次檢查kafka隊列,比方說,10分鐘等等。然後我也不想放鬆發送的信息。我怎樣才能控制它與抵消?我應該將它設置爲「最早」還是應該手動控制它? – duckertito
@duckertito這不是你可以配置的東西。 –
我認爲有可能捕獲這種情況。但實際上這很奇怪。如果使用Spark Streaming + Kafka,則表示存在打開的連接。如果Spark Streaming作業長時間運行(例如X個月直到某些維護),那麼肯定會出現連接失敗的情況。那麼,在這種情況下通常會做什麼呢?如何防止這種情況?我應該每天檢查一次Kafka還是如何自動化它? – duckertito