2016-03-21 37 views
0

我使用Spark插入cassandra。如何知道行數使用Spark插入cassandra

CassandraJavaUtil.javaFunctions(newRDD) 
      .writerBuilder("dmp", "dmp_user_user_profile_spark1", mapToRow(UserSetGet.class)).saveToCassandra(); 
      logger.info("DataSaved"); 

我的問題是,如果RDD有5k行,並由於某種原因插入卡桑德拉,作業失敗。

會不會有回滾的插入出5K

這一點,如果沒有,我怎麼會知道多少行實際插入,這樣我可以從失效行重新開始我的工作行。

回答

1

簡單的回答,不,不會有自動回滾。

無論數據火花能夠保存到cassandra中,都會被保存到cassandra中。

不,沒有簡單的方法知道什麼數據集,火花工作能夠成功保存。事實上,我能想到的唯一方法是,從cassandra讀取數據,根據關鍵字從結果集中加入和過濾掉。

說實話,如果數據巨大,使得大量加入,這似乎是相當大的開銷。在大多數情況下,您可以簡單地重新運行該作業,並讓它再次保存到cassandra表中。 由於在cassandra中更新和插入的工作方式相同。這不會是一個問題。

只有在處理計數器表時纔會出現問題。

更新: 對於這種特定情況,您可以將rdd拆分成您的尺寸的批次,然後嘗試保存它們。 這樣,如果你在一個rdd失敗,你會知道哪個rdd失敗。如果沒有這個設定,你應該可以從下一個rdd拿起。

+0

嗨Abhishek謝謝你的答案,但這裏的問題是在表中有一列app_count,每增加一個更新。所以我需要知道失敗發生時更新了哪些行。 –

+0

更新了答案。或者你可能應該引入一個額外的列來跟蹤cassandra中的這種東西,可以是Date或甚至整數列。 –

+0

嗨Abhishek,我做了完全相同的事情,即將RDD拆分成固定大小的長度,但我的問題是,它會在插入時出現中途失敗,例如5000只有2000個插入,然後失敗。 。我的表是巨大的,所以我試圖引入一個額外的列(如AutoIncrement int),使用zipWithIndex。無論如何感謝您的答案 –

相關問題