我正在使用spark和scala進行項目工作,我對兩者都很陌生,但有很多來自stackoverflow的幫助我已完成所有數據處理並將處理後的數據存儲在mysql中。現在我終於面臨一個問題,我不明白如何解決它。第一次當我處理數據時,我使用這種方法存儲數據幀,第一次表是空的。使用Scala更新Spark數據庫中的數據
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
假設我處理的數據在數據庫中看起來像這樣。
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 24 2016-04-27 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
現在處理這個新的數據後,命名爲「根」與EID「C111」訪問辦公室2倍「二零一六年八月三十日00:00:00」的人,現在我只需要更新的這個人紀錄數據庫。我將如何做到這一點。現在更新的表應該看起來像這樣。
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 26 2016-08-30 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
我有數據萬人在此表中,如果我加載全表火花數據幀和更新所需的記錄,然後將需要更多的時間並且也沒有意義,因爲爲什麼我加載全表當我想更新只有一行。我試過這個代碼,但它將新行添加到表,而不是更新行。
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
有什麼辦法可以做到火花?
我在互聯網上看到過這個,我可以這樣做來更新。
val numParallelInserts = 10
val batchSize = 1000
new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
val db = connect()
val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
val stmt = db.prepareStatement(sql)
iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
batch foreach { session =>
stmt.setString(1, session.id)
stmt.setString(2, TimestampFormat.print(session.ts))
stmt.addBatch()
}
stmt.executeBatch()
db.commit();
logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
}
db.close();
你用 「覆蓋」 模式試過嗎? – dsr301
覆蓋重新創建具有不確切數據類型的表,並刪除所有較舊的數據,並只插入新處理的數據。 –