2017-07-28 30 views
0

我通過以下代碼成功地將文件從S3加載到Spark中。它正在工作,但是我注意到1個文件和另一個文件之間存在延遲,並且它們按順序加載。我想通過並行加載來改善這一點。並行加載S3文件Spark

 // Load files that were loaded into firehose on this day 
    var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd 

    // Apply the schema to the RDD, here we will have duplicates 
    val usersDataFrame = spark.createDataFrame(s3Files, schema) 

    usersDataFrame.createOrReplaceTempView("results") 

    // Clean and use partition by the keys to eliminate duplicates and get latest record 
    var results = spark.sql(buildCleaningQuery(job, "results")) 
    results.createOrReplaceTempView("filteredResults") 
    val records = spark.sql("select count(*) from filteredResults") 

我也試圖通過文本文件()方法加載,但然後我有問題轉化RDD [字符串]以RDD [行],因爲後來我就需要移動使用SQL星火。我以如下方式使用它;

 var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD() 

什麼是加載JSON文件(每個大約50MB的多個文件)到Spark的理想方式?我想根據模式驗證屬性,所以稍後我將能夠使用Spark SQL查詢來清理數據。

+2

是否有必要將s3Files更改爲rdd?如果你不把它改成rdd,我相信它會並行地提取文件內容。 – wllmtrng

+0

最終結果將是一個數據框,然後在其上運行Spark SQL查詢並保存到紅移。在不轉換爲RDD的情況下,我將無法遵循該邏輯,除非我錯過了某些東西。 – Mez

+1

var s3Files = spark.sqlContext.read.schema(schema).json(...)。createOrReplaceTempView(「results」) 應該足夠。嘗試一下,看看它是否仍然順序讀取 – wllmtrng

回答

2

發生了什麼是DataFrame被轉換成RDD然後再轉換成DataFrame,然後丟失分區信息。

var s3Files = spark 
    .sqlContext 
    .read.schema(schema) 
    .json(...) 
    .createOrRepla‌​ceTempView("results"‌​) 

應該足夠了,分區信息應該仍然存在,允許json文件同時加載。