2016-09-01 32 views
0

我正在使用spark和scala進行項目工作,我對兩者都很陌生,但有很多來自stackoverflow的幫助我已完成所有數據處理並將處理後的數據存儲在mysql中。現在我終於面臨一個問題,我不明白如何解決它。第一次當我處理數據時,我使用這種方法存儲數據幀,第一次表是空的。使用Scala更新Spark數據庫中的數據

 df.write.mode("append").jdbc("dburl", "tablename", "dbproperties"); 

假設我處理的數據在數據庫中看起來像這樣。

 id  name  eid  number_of_visitis last_visit_date 
     1  John  C110  12     2016-01-13 00:00:00 
     2  Root  C111  24     2016-04-27 00:00:00 
     3  Michel  C112  8     2016-07-123 00:00:00 
     4  Jonny  C113  45     2016-06-10 00:00:00 

現在處理這個新的數據後,命名爲「根」與EID「C111」訪問辦公室2倍「二零一六年八月三十日00:00:00」的人,現在我只需要更新的這個人紀錄數據庫。我將如何做到這一點。現在更新的表應該看起來像這樣。

 id  name  eid  number_of_visitis last_visit_date 
     1  John  C110  12     2016-01-13 00:00:00 
     2  Root  C111  26     2016-08-30 00:00:00 
     3  Michel  C112  8     2016-07-123 00:00:00 
     4  Jonny  C113  45     2016-06-10 00:00:00 

我有數據萬人在此表中,如果我加載全表火花數據幀和更新所需的記錄,然後將需要更多的時間並且也沒有意義,因爲爲什麼我加載全表當我想更新只有一行。我試過這個代碼,但它將新行添加到表,而不是更新行。

 df.write.mode("append").jdbc("dburl", "tablename", "dbproperties"); 

有什麼辦法可以做到火花?

我在互聯網上看到過這個,我可以這樣做來更新。

val numParallelInserts = 10 
val batchSize = 1000 

new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) => 
    val db = connect() 

    val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)" 
    val stmt = db.prepareStatement(sql) 

    iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) => 
    batch foreach { session => 
     stmt.setString(1, session.id) 
     stmt.setString(2, TimestampFormat.print(session.ts)) 
     stmt.addBatch() 
    } 
    stmt.executeBatch() 
    db.commit(); 
    logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements") 
    } 

    db.close(); 
+0

你用 「覆蓋」 模式試過嗎? – dsr301

+1

覆蓋重新創建具有不確切數據類型的表,並刪除所有較舊的數據,並只插入新處理的數據。 –

回答

0

你可以嘗試使用sql來做到這一點。將更新的(甚至是新的)數據存儲在新的臨時表中,然後將臨時表合併到主表中。要做到這一點

的一種方式是 -

  1. 更新所有主表使用臨時表

    update main_table set visits = main_table.visits + temp_table.visits from temp_table where main_table.eid = temp_table.eid;

  2. 刪除臨時表中的所有重複記錄(記錄葉色只有臨時表中的新記錄)

    delete from temp_table where main_table.eid = temp_table.eid;

  3. 插入從臨時表中的所有記錄到主表

    insert into main_table select * from temp_table;

  4. 刪除臨時表

    drop table temp_table;

+0

這是在db級別,並有更多的步驟,我想以最短的方式做到這一點。我有數以百萬計的數據,因此複製,刪除和插入需要時間。我更新了這個問題,請看看你能否理解我想要做的事情。 –

+0

我在紅字數據庫上用了幾十億行。 – Kakaji