2015-06-26 15 views
1

我正在使用hadoop map-reduce來處理XML文件。我直接將JSON數據存儲到mongodb中。
如何才能實現在執行BulkWriteOperation之前只有非重複的記錄將被存儲到數據庫中?使用hadoop減速器檢查重複記錄,同時將BulkWriteOperation轉換爲mongo

重複記錄標準將根據產品形象和產品名稱,我不想使用層嗎啡的,我們可以指定索引的類成員。

這裏是我的減速機類:

public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{ 

private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);  

protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{ 
    LOGGER.info("reduce()------Start for key>"+key); 
    Map<String,String> insertProductInfo = new HashMap<String,String>(); 
    try{ 
     MongoClient mongoClient = new MongoClient("localhost", 27017); 
     DB db = mongoClient.getDB("test"); 
     BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation(); 
     for (MapWritable entry : values) { 
      for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) { 
        insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString()); 
       } 
      if(!insertProductInfo.isEmpty()){ 
       BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo); 
       operation.insert(basicDBObject); 
      }   
     } 
     //How can I check for duplicates before executing bulk operation 
     operation.execute(); 
     LOGGER.info("reduce------end for key"+key); 
    }catch(Exception e){ 
     LOGGER.error("General Exception in XMLReducer",e); 
    } 
    } 
} 

編輯:

BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image")) 
       .append("product_name", basicDBObject.get("product_name")); 
       operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject)); 
operation.insert(basicDBObject); 

我收到錯誤,如:com.mongodb.MongoInternalException: no mapping found for index 0

任何幫助建議的答案我已經加入後將是有用的。謝謝。

回答

1

我想這一切都取決於你真正想用「重複」來處理你如何處理它。

對於您可以隨時使用.initializeUnOrderedBulkOperation()這將不會從您的索引(它需要停止重複項)的重複鍵「錯誤」,但會報告返回的BulkWriteResult對象中的任何此類錯誤。這是從.execute()

BulkWriteResult result = operation.execute(); 

在另一方面回來,你可以用「upserts」來代替,而運營商如$setOnInsert只讓其中沒有重複的存在改變:

BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo); 
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key")); 

operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject)); 

所以你基本上查找持有「關鍵字」的字段的值以確定與查詢重複,然後僅實際更改未找到該「關鍵字」的任何數據,從而創建新文檔並「插入」。

在這兩種情況下,默認行爲都是「插入」第一個唯一的「鍵」值,然後忽略所有其他的發生。如果你想做其他事情,如找到相同密鑰的「覆蓋」或「增量」值,那麼012up「upsert」方法就是你想要的方法,但是你將使用其他update operators來執行這些操作。

+0

我沒有得到如何創建查詢?我想檢查'basicdbobject'是否包含具有許多文件的'insertProductInfo'。從產品名稱和產品圖像是檢查重複記錄是否符合我的標準? – Nakul91

+0

@ Nakul91應該只有一個「有限」數量的字段才能真正形成重複。然後,如果整個內容永遠不會存在多次,那麼只需使用所插入的對象作爲查詢的完整內容即可。如果你真的不確定,就問另一個問題。但是你應該在你認爲「獨一無二」的字段上定義一個「唯一」索引。 –

+0

唯一性僅在這兩個字段上定義。我已經在集合上的這些字段中添加了複合索引,但是當對5000個記錄使用.initializeUnOrderedBulkOperation()進行批量插入時,如果第1001條記錄是重複的,那麼它會拋出異常,剩餘的4000條記錄不會被添加到數據庫中。根據你它不會錯誤的重複,但它仍然是一個錯誤 – Nakul91