我正在使用hadoop map-reduce來處理XML文件。我直接將JSON數據存儲到mongodb中。
如何才能實現在執行BulkWriteOperation
之前只有非重複的記錄將被存儲到數據庫中?使用hadoop減速器檢查重複記錄,同時將BulkWriteOperation轉換爲mongo
重複記錄標準將根據產品形象和產品名稱,我不想使用層嗎啡的,我們可以指定索引的類成員。
這裏是我的減速機類:
public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{
private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);
protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
LOGGER.info("reduce()------Start for key>"+key);
Map<String,String> insertProductInfo = new HashMap<String,String>();
try{
MongoClient mongoClient = new MongoClient("localhost", 27017);
DB db = mongoClient.getDB("test");
BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
for (MapWritable entry : values) {
for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
}
if(!insertProductInfo.isEmpty()){
BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
operation.insert(basicDBObject);
}
}
//How can I check for duplicates before executing bulk operation
operation.execute();
LOGGER.info("reduce------end for key"+key);
}catch(Exception e){
LOGGER.error("General Exception in XMLReducer",e);
}
}
}
編輯:
BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
.append("product_name", basicDBObject.get("product_name"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
operation.insert(basicDBObject);
我收到錯誤,如:com.mongodb.MongoInternalException: no mapping found for index 0
任何幫助建議的答案我已經加入後將是有用的。謝謝。
我沒有得到如何創建查詢?我想檢查'basicdbobject'是否包含具有許多文件的'insertProductInfo'。從產品名稱和產品圖像是檢查重複記錄是否符合我的標準? – Nakul91
@ Nakul91應該只有一個「有限」數量的字段才能真正形成重複。然後,如果整個內容永遠不會存在多次,那麼只需使用所插入的對象作爲查詢的完整內容即可。如果你真的不確定,就問另一個問題。但是你應該在你認爲「獨一無二」的字段上定義一個「唯一」索引。 –
唯一性僅在這兩個字段上定義。我已經在集合上的這些字段中添加了複合索引,但是當對5000個記錄使用.initializeUnOrderedBulkOperation()進行批量插入時,如果第1001條記錄是重複的,那麼它會拋出異常,剩餘的4000條記錄不會被添加到數據庫中。根據你它不會錯誤的重複,但它仍然是一個錯誤 – Nakul91