2016-08-27 48 views
10

我在小型三服務器Amazon EMR 5(Spark 2.0)羣集上運行Spark作業。我的工作運行了一個小時左右,失敗並顯示下面的錯誤。我可以手動重新啓動,它可以工作,處理更多的數據,並最終再次失敗。Amazon EMR上的Spark:「超時等待連接池」

我的Spark代碼非常簡單,並且沒有直接使用任何Amazon或S3 API。我的Spark代碼將S3文本字符串路徑傳遞給Spark,Spark在內部使用S3。

我的Spark程序只在循環中執行以下操作:從S3加載數據 - >進程 - >將數據寫入S3上的不同位置。

我的第一個懷疑是一些內部的亞馬遜或Spark代碼沒有正確地處理連接,並且連接池耗盡。

com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout waiting for connection from pool 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:991) 
      at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:212) 
      at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:498) 
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) 
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) 
      at com.sun.proxy.$Proxy44.retrieveMetadata(Unknown Source) 
      at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:780) 
      at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1428) 
      at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:313) 
      at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:85) 
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60) 
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58) 
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
      at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
      at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
      at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) 
      at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) 
      at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487) 
      at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
      at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) 
      at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:498) 
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
      at py4j.Gateway.invoke(Gateway.java:280) 
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) 
      at py4j.commands.CallCommand.execute(CallCommand.java:79) 
      at py4j.GatewayConnection.run(GatewayConnection.java:211) 
      at java.lang.Thread.run(Thread.java:745) 
    Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool 
      at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226) 
      at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195) 
      at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:498) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy45.getConnection(Unknown Source) 
      at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423) 
      at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
      at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
      at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837) 
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) 
      ... 41 more 
+0

如需進一步的幫助,您需要提供代碼示例和/或告訴我們您的代碼使用何種語言。在SO的其他地方提到了這種類型的問題,並且他們編寫了一段代碼來處理徘徊的S3連接:http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3 – Kristian

+0

I在AWS中遇到與Spark 1.6.1類似的問題。從調試日誌中,我可以看到每個執行程序在任務結束時都無法釋放連接。即:執行者可能開始處理spark任務5,並且在處理該任務時,它將對S3進行多次調用,然後關閉連接,但是當任務結束時(並且它想要啓動任務6),一個連接不可用,關閉。因此,如果連接池大小爲n,執行程序啓動其n + 1任務時會出現此錯誤。 –

+0

也得到這個問題,當我只使用Dataframewriter和Dataframereader時,如何關閉S3連接? –

回答

5

我遇到與EMR一個非常重要的程序(讀取S3,過濾數據,寫入S3)這個問題。

我可以通過使用S3A文件系統實現並設置fs.s3a.connection.maximum100來解決它有一個更大的連接池。 (默認爲15;看到Hadoop-AWS module: Integration with Amazon Web Services更多配置屬性)

這是我的設置配置:

// in Scala 
val hc = sc.hadoopConfiguration 

// in Python (not tested) 
hc = sc._jsc.hadoopConfiguration() 

// setting the config is the same for both languages 
hc.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 
hc.setInt("fs.s3a.connection.maximum", 100) 

要使其工作時,S3的URI傳遞給星火已經開始與s3a://...

+1

我看到相同的問題。上述對我來說似乎沒有任何區別。我們肯定使用s3a網址。 – user894199

+0

得到了同樣的錯誤 –

相關問題