2017-07-10 45 views
0

以下代碼從S3加載數據,使用SparkSQL清除和刪除重複項,然後使用JDBC將數據保存到Redshift。我也嘗試使用spark-redshift maven dependency並獲得相同的結果。我正在使用Spark 2.0。Spark沒有保存所有數據到紅移

我不明白的是,當顯示加載到內存中的結果時,總和是預期的數字,但是當Spark保存到Redshift時,它總是少一些。不知怎的,並不是所有的記錄都保存了,我也沒有在STL_LOAD_ERRORS中看到任何錯誤。任何人遇到這種情況或有任何想法,爲什麼發生這種情況?

 // Load files that were loaded into firehose on this day 
    var s3Files = spark.sqlContext.read.schema(schema).json("s3://" + job.getAWSAccessKey + ":" + job.getAWSSecretKey + "@" + job.getBucketName + "/"+ job.getAWSS3RawFileExpression + "/" + year+ "/" + monthCheck+ "/" + dayCheck + "/*/").rdd 

    // Apply the schema to the RDD, here we will have duplicates 
    val usersDataFrame = spark.createDataFrame(s3Files, schema) 

    usersDataFrame.createOrReplaceTempView("results") 

    // Clean and use partition by the keys to eliminate duplicates and get latest record 
    var results = spark.sql(buildCleaningQuery(job,"results")) 
    results.createOrReplaceTempView("filteredResults") 

    // This returns the correct result! 
    var check = spark.sql("select sum(Reward) from filteredResults where period=1706") 
    check.show() 

    var path = UUID.randomUUID().toString() 

    println("s3://" + job.getAWSAccessKey + ":" + job.getAWSSecretKey + "@" + job.getAWSS3TemporaryDirectory + "/" + path) 

    val prop = new Properties() 

    results.write.jdbc(job.getRedshiftJDBC,"work.\"" + path + "\"",prop) 

回答

0

使用jdbc意味着火花將試圖反覆做INSERT INTO報表 - 這是大規模在紅移緩慢。這就是爲什麼你沒有看到stl_load_errors中的條目。

我建議你改用spark-redshift庫。它經過了很好的測試,性能會更好。 https://github.com/databricks/spark-redshift

實施例(示出了許多選項):

my_dataframe.write 
    .format("com.databricks.spark.redshift") 
    .option("url", "jdbc:redshift://my_cluster.qwertyuiop.eu-west-1.redshift.amazonaws.com:5439/my_database?user=my_user&password=my_password") 
    .option("dbtable", "my_table") 
    .option("tempdir", "s3://my-bucket") 
    .option("diststyle", "KEY") 
    .option("distkey", "dist_key") 
    .option("sortkeyspec", "COMPOUND SORTKEY(key_1, key_2)") 
    .option("extracopyoptions", "TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF") 
    .mode("overwrite") // "append"/"error" 
    .save() 
+0

是的,我使用的兩個,作爲予說明。我使用JDBC來消除火花紅移出現問題的可能性。 – Mez

+0

也許有些文件被跳過。防止'spark-redshift'庫刪除臨時數據,並檢查'stl_load_commits'對S3文件的列表。 –

相關問題