2017-08-04 236 views
0

我在運行Spark Streaming應用程序,該應用程序從Kafka(使用Direct Stream方法)讀取數據並將結果發佈回Kafka。應用程序的輸入速率以及應用程序的吞吐量保持穩定大約一兩個小時。之後,我開始在很長一段時間內看到在Active Batches隊列中保留的批次(持續30分鐘+)。該Spark driver日誌表明以下兩種類型的錯誤,而這些錯誤發生的時間與卡住批次的開始時間一致得好:Spark Spark待處理批處理

第一個錯誤類型

ERROR LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. 

二錯誤鍵入

ERROR StreamingListenerBus: Listener StreamingJobProgressListener threw an exception 
java.util.NoSuchElementException: key not found: 1501806558000 ms 
    at scala.collection.MapLike$class.default(MapLike.scala:228) 
    at scala.collection.AbstractMap.default(Map.scala:59) 
    at scala.collection.mutable.HashMap.apply(HashMap.scala:65) 
    at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134) 
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) 
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) 
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) 
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) 
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75) 
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) 
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) 
    at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) 
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77) 

但是,我不知道如何解釋這些錯誤,儘管廣泛的在線搜索,我合作沒有找到與此相關的任何有用信息。

問題

  1. 什麼這些錯誤是什麼意思?它們是否表示資源限制(例如:CPU,內存等)?
  2. 解決這些錯誤的最佳方法是什麼?

在此先感謝。

回答

0

您的批次持續時間是不是少於實際批處理時間?默認批處理隊列大小爲1000,因此可以溢出Spark Spark批處理隊列。

+0

請考慮重新說明這一點。這看起來像一個平庸的評論,而不是真正回答問題的東西。 – GhostCat