2017-03-07 88 views
3

我有寫RDD數據MongoDB的火花應用程序,我得到一個MongoBulkWriteException。以前,我在使用MongoDB標準驅動程序中的bulkWrite()方法,但我已經開始使用MongoSpark驅動程序中的write()方法。MongoSpark節省重複鍵錯誤E11000

別的之前,我使用的Apache 星火1.6.0MongoDB的3.2.11

此異常跟蹤:

com.mongodb.MongoBulkWriteException: Bulk write operation error on server 
10.1.101.146:27017. Write errors: [BulkWriteError{index=0, code=11000, 
message='E11000 duplicate key error collection: collection-test 
index: _id_ dup key: { : "636253651-2017-03-07" }', details={ }}] 

產生它的代碼是:

JavaRDD<Document> rddInsertRecords = rddGrouped.map(new Function<Tuple2<String, BasicRecord>, Document>() { 
private static final long serialVersionUID = 1L; 
    @Override 
    public Document call(Tuple2<String, BasicRecord> tuple2) throws Exception { 
      Document json = tuple2._2.toBSONDocument(); 
      return json; 
     } 
}); 
MongoSpark.save(rddInsertRecords, WriteConfig.create(sc.getConf())); 

我用我的舊代碼的替代解決方案,但我想用MongoSpark寫。

我在MongoDB的JIRA(https://jira.mongodb.org/browse/SERVER-14322)中看到過這個問題,但我不確定我怎麼能繞過這個問題。

UPDATE:我忘了提及第一次沒有發生故障(即沒有mongodb上的數據,集合爲空)。第二次運行作業時失敗。從技術上講,司機應該做一個補充,我是對的嗎?

回答

2

Spark連接器不知道如何插入RDD<T>其中T可以是任何類型 - 它如何獲得id值?

然而,數據集/ DataFrames具有與它們指示哪個字段是_id字段並能自動被用於upserts模式信息。這在SPARK-66完成。 Datasets/DataFrames的另一個好處是它們更高效,並且可以提高Spark作業的性能。

如果你必須使用RDD的,那麼你可以通過編程訪問MongoDB的收集,並通過MongoConnector類創建一個更新插入操作。

+0

羅傑那。我不能將RDD切換到數據集,所以我想我會使用MongoConnector方法。感謝您的澄清。 – cabreracanal

相關問題