2016-09-08 81 views
1

我正在寫數據(大約83M記錄)從一個數據幀到postgresql,它有點慢。花2.7小時完成寫入數據庫。Spark寫入到postgres慢

看着執行者,只有一個活動任務運行在一個執行器上。有什麼辦法可以使用Spark中的所有執行程序將寫入並行化爲數據庫?

... 
val prop = new Properties() 
prop.setProperty("user", DB_USER) 
prop.setProperty("password", DB_PASSWORD) 
prop.setProperty("driver", "org.postgresql.Driver") 



salesReportsDf.write 
       .mode(SaveMode.Append) 
       .jdbc(s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE", REPORTS_TABLE, prop) 

感謝

+0

您可以添加代碼的一部分寫入PostGres? –

+0

@ThiagoBaldim剛剛爲此發佈了代碼段,謝謝 –

回答

2

所以我想通了這個問題。基本上,對數據幀進行重新分區可將數據庫寫入吞吐量提高100%

def srcTable(config: Config): Map[String, String] = { 

    val SERVER    = config.getString("db_host") 
    val PORT    = config.getInt("db_port") 
    val DATABASE   = config.getString("database") 
    val USER    = config.getString("db_user") 
    val PASSWORD   = config.getString("db_password") 
    val TABLE    = config.getString("table") 
    val PARTITION_COL  = config.getString("partition_column") 
    val LOWER_BOUND  = config.getString("lowerBound") 
    val UPPER_BOUND  = config.getString("upperBound") 
    val NUM_PARTITION  = config.getString("numPartitions") 

    Map(
    "url"  -> s"jdbc:postgresql://$SERVER:$PORT/$DATABASE", 
    "driver" -> "org.postgresql.Driver", 
    "dbtable" -> TABLE, 
    "user" -> USER, 
    "password"-> PASSWORD, 
    "partitionColumn" -> PARTITION_COL, 
    "lowerBound" -> LOWER_BOUND, 
    "upperBound" -> UPPER_BOUND, 
    "numPartitions" -> NUM_PARTITION 
) 

} 
+0

您能否在您的回答中提供更多detials(如重新分配之前和之後的部分)我也面臨類似的問題,您在這方面的幫助將不勝感激 - 謝謝 – user2359997

+0

@ user2359997更新了我的答案,具體取決於表的大小 - 您可以指定分區的數量,以便每個執行程序可以並行化數據的提取。 –