2017-07-18 67 views
1

運行我的Spark應用程序時出現以下錯誤,我們有一個運行多個有狀態(使用mapWithState)和無狀態操作的大型應用程序。由於spark本身會掛起,而且我們看到的唯一錯誤是在spark日誌中,而不是應用程序日誌本身,因此越來越難以找出錯誤。錯誤清除廣播異常

該錯誤僅在以10秒的微批處理間隔計滿4-5分鐘後纔會發生。 我在基於Kafka的輸入和輸出流的Ubuntu服務器上使用Spark 1.6.1。

請注意,這不是我能夠提供儘可能小的代碼,因爲它不會在單元測試的情況下發生重新創建這個bug,和應用程序本身是非常大的

任何方向你可以給解決這個問題會有幫助。請讓我知道我是否可以提供更多信息。

錯誤下面內嵌:

[2017-07-11 16:15:15,338] ERROR Error cleaning broadcast 2211 (org.apache.spark.ContextCleaner) 

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 

     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 

     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 

     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 

     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 

     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 

     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 

     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 

     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 

     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:77) 

     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 

     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 

     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 

     at scala.Option.foreach(Option.scala:236) 

     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 

     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 

     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 

     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 

    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 

     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 

     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 

     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 

     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 

     at scala.concurrent.Await$.result(package.scala:107) 

     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
+0

請。檢查我的[回覆](https://stackoverflow.com/a/40722515/647053)似乎是你的超時使用默認的120秒。 –

+0

@RamGhadiyaram請看看第一個錯誤,它似乎報告錯誤清除廣播...你是否說,由於超時本身發生?這樣的工作量不會改變......它是一個具有恆定輸入速率的卡夫卡輸入流... – tsar2512

+0

是的。看到[ContextCleaner.scala](https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/ContextCleaner.scala) - >'doCleanupBroadcast'方法錯誤消息表示由於清理任務而導致其超時錯誤。請延長暫停時間。它應該幫助你。 –

回答

1

你的異常消息明確表示,其RPCTimeout由於默認爲120秒配置和調整,按您的工作負載的最佳值。 請參閱1.6 configuration

你的錯誤消息org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds].at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)確認。


爲了更好的理解,請參閱從

下面的代碼中看到RpcTimeout.scala

 /** 
    * Wait for the completed result and return it. If the result is not available within this 
    * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout. 
    * @param awaitable the `Awaitable` to be awaited 
    * @throws RpcTimeoutException if after waiting for the specified time `awaitable` 
    *   is still not ready 
    */ 
    def awaitResult[T](awaitable: Awaitable[T]): T = { 
    try { 
     Await.result(awaitable, duration) 
    } catch addMessageIfTimeout 
    } 
}