2016-09-14 56 views
0

我的應用程序循環遍歷包含S3目錄的文本文件的行,讀入它們,執行ETL過程,然後寫回到S3,它在同一個地方失敗了幾個(大約80個循環後),所以我認爲Spark沒有關閉我的S3連接,並且我分配的連接池已經耗盡。錯誤是:Spark和Amazon EMR:S3連接未關閉

16/09/14 19:30:49 INFO AmazonHttpClient: Unable to execute HTTP request: Timeout waiting for connection from pool 
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool 
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226) 
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195) 
    at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy37.getConnection(Unknown Source) 
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423) 
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015) 
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:991) 
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:212) 
    at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) 
    at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source) 
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:780) 
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1428) 
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:313) 
    at org.apache.spark.sql.execution.datasources.DataSource.hasMetadata(DataSource.scala:289) 
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:324) 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:452) 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:458) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:451) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) 
    at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:26) 
    at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:26) 
    at com.webpt.SparkMaxwell$$anonfun$main$1.apply(SparkMaxwell.scala:116) 
    at com.webpt.SparkMaxwell$$anonfun$main$1.apply(SparkMaxwell.scala:105) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at com.webpt.SparkMaxwell$.main(SparkMaxwell.scala:105) 
    at com.webpt.SparkMaxwell.main(SparkMaxwell.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

相關代碼:

val primaryKeyFile = sparkSession.sparkContext.textFile(keys) 

    for(line <- primaryKeyFile.collect()){ 

     //keys text file is of syntax tableName (tab) key,key,key 
     val (tableName, tableKeys) = (line.split("\t")(0).toLowerCase(), line.split("\t")(1).toLowerCase.split(",").toSeq) 
     //if the table exists in master 
     if (FileSystem.get(new URI(s"$lake/$tableName/$yesterday"), sparkSession.sparkContext.hadoopConfiguration) 
      .exists(new Path(s"$lake/$tableName/$yesterday"))) { 
      if (!distinctTables.contains(tableName)) { 
      //if exists only in master 
      //read in the master data from yesterday and copy it to today after changing columns to lower case 
      val masterDF = sparkSession.sqlContext.read.avro(s"$lake/$tableName/$yesterday/*").repartition(1500) 
      masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today") 
      } 
      else { 
      //if exists both in master and input 
      //filter out all unrelated data out of the testFreeDF 
      val soloDF = testFreeDF.filter(lower(col("table")).equalTo(tableName)) 
      //Replace the encrypted data column with a decrypted data column, then turn each row into strings so sqlContext.read.json can read the json string into a DF 
      val decryptedDF = sparkSession.sqlContext.read.json(soloDF.withColumn("data", decryptUDF(soloDF("data"))).select("data").rdd.map(row => row.toString())) 
      //lowercase the column names by selecting the column names in lower case 
      val lowerCaseDF = decryptedDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*) 

      //read in masterDF and create list of columns in lower case so we can check for schema changes 

      val masterDF = sparkSession.sqlContext.read.avro(s"$lake/$tableName/$yesterday/*").repartition(1500) 
      val lowerCaseDFColumns = lowerCaseDF.columns 
      val lowerCaseMasterDFColumns = masterDF.columns.map(column => column.toLowerCase) 

      //if the columns are the same.. 
      if (lowerCaseDFColumns.toSet == lowerCaseMasterDFColumns.toSet) { 
       //union the two tables adding a new old/new column, input data with value of 2, master data with value of 1 
       val finalFrame: DataFrame = lowerCaseDF.withColumn("old/new", lit("2")).union(masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).withColumn("old/new", lit("1"))).repartition(1500) 

       masterDF.unpersist() 
       lowerCaseDF.unpersist() 

       val mergeWindowFunction = Window.partitionBy(tableKeys.head, tableKeys.tail: _*).orderBy(desc("old/new")) 
       //call window function, partitions by the primary keys and orders by old/new, for each partition the highest old/new is kept, meaning duplicates in master are dropped 
       finalFrame.withColumn("rownum", row_number.over(mergeWindowFunction)).where("rownum = 1") 
       .drop("rownum").drop("old/new").write.mode("append").avro(s"$lake/$tableName/$today") 

       finalFrame.unpersist() 

      } 
      //if we have different columns then we need to adjust, in the end we want to keep all columns 
      else { 

       //create select statements for each dataframe that maintains existing columns, and adds null columns for the columns they don't have 
       val masterExprs = lowerCaseDFColumns.union(lowerCaseMasterDFColumns).distinct.map(field => 
       //if the field already exists in master schema, we add the name to our select statement 
       if (lowerCaseMasterDFColumns.contains(field)) { 
        col(field.toLowerCase) 
       } 
       //else, we hardcode a null column in for that name 
       else { 
        lit(null).alias(field.toLowerCase) 
       } 
      ) 

       //same thing for input, maybe we could get this down to one map() 
       val inputExprs = lowerCaseDFColumns.union(lowerCaseMasterDFColumns).distinct.map(field => 
       //if the field already exists in master schema, we add the name to our select statement 
       if (lowerCaseDFColumns.contains(field)) { 
        col(field.toLowerCase) 
       } 
       //else, we hardcode a null column in for that name 
       else { 
        lit(null).alias(field.toLowerCase) 
       } 
      ) 

       val mergeWindowFunction = Window.partitionBy(tableKeys.head, tableKeys.tail: _*).orderBy(desc("old/new")) 

       //same process, we just use our select statements beforehand 
       masterDF.select(masterExprs: _*).withColumn("old/new", lit("2")) 
       .union(lowerCaseDF.select(inputExprs: _*).withColumn("old/new", lit("1"))).repartition(1500) 
       .withColumn("rownum", row_number.over(mergeWindowFunction)).where("rownum = 1").drop("rownum") 
       .drop("old/new").write.mode("append").avro(s"$lake/$tableName/$today") 

       masterDF.unpersist() 
       lowerCaseDF.unpersist() 
      } 

      } 
     } 
     else { 
      if (distinctTables.contains(tableName)) { 
      //if the input doesn't exist in master, we filter out unrelated data, decrypt it, set the columns to be lower case, and then write it to master 
      val soloDF = testFreeDF.filter(lower(col("table")).equalTo(tableName)) 
      val decryptedDF = sparkSession.sqlContext.read.json(soloDF.withColumn("data", decryptUDF(soloDF("data"))).select("data").rdd.map(row => row.toString())) 
      val lowerCaseDF = decryptedDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*) 
      lowerCaseDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today") 
      } 
     } 
    } 

線,它的失敗上:

masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today") 

所以,我該怎麼辦?有什麼方法可以確保Spark在讀取/寫入S3之後關閉連接?

+0

鏈接我所做的一些相關問題以及我收到的一些回覆: https://issues.apache.org/jira/browse/SPARK-17544# https://github.com/databricks/spark -avro/issues/156 看起來是火花或火花的問題 –

回答