2017-08-03 31 views
1

我有2個dataframes,我想找到除2等於所有列(surrogate_key,電流)的記錄火花保存服用大量的時間

然後,我要保存新surrogate_key值的記錄。

以下是我的代碼:

val seq = csvDataFrame.columns.toSeq 
var exceptDF = csvDataFrame.except(csvDataFrame.as('a).join(table.as('b),seq).drop("surrogate_key","current")) 
exceptDF.show() 

exceptDF = exceptDF.withColumn("surrogate_key", makeSurrogate(csvDataFrame("name"), lit("ecc"))) 
exceptDF = exceptDF.withColumn("current", lit("Y")) 

exceptDF.show() 

exceptDF.write.option("driver","org.postgresql.Driver").mode(SaveMode.Append).jdbc(postgreSQLProp.getProperty("url"), tableName, postgreSQLProp) 

該代碼給出正確的結果,但在寫這些結果postgre卡住。

不知道是什麼問題。還有沒有更好的方法呢?

問候, Sorabh

+0

還顯示()寫入正確postgre打印數據幀,但後來在寫之前需要太多的時間。 –

+1

'show'不會對整個數據執行轉換,只需要顯示多少數據(默認爲20)。在寫入postgres之前,您需要執行緩存+操作,然後您可以測量寫入postgres的實際時間。 「很多時間」是一個非常廣泛的描述,你有多少數據以及它實際上需要多少時間... – eliasah

+0

嗨Eliasah,我用過cache()+ count(),它需要大約1/2小時只有3行10列,延遲也不是由於寫入Postgre。在添加count()之後,它的count()現在需要時間。 –

回答

1

默認情況下,spark-sql會創建200個分區,這意味着當您嘗試保存datafrmae時,它將被保存在200個parquet文件中。您可以使用以下技術減少Dataframe的分區數量。

  1. 在應用程序級別。設置參數 「spark.sql.shuffle.partitions」 如下:

    sqlContext.setConf( 「spark.sql.shuffle.partitions」, 「10」)

  2. 減少分區數,該特定據幀如下:

    df.coalesce(10).write.save(...)

希望它幫助。

問候,

Neeraj

+0

Thanks Neeraj,That helps .. :) –

0

使用var對數據幀不建議的,你應該總是使用val和數據幀進行一番改造後,創建一個新的數據幀。

請刪除所有的var並替換爲val

希望這會有所幫助!

+0

謝謝,尚卡爾。即使將var更改爲val,問題仍然存在。 –