我跟着mongo-hadoop連接器的documentation。如何使用mongo-hadoop連接器使用spark來保存mongo集合中的數據?
我能夠將數據從inputCol
收集testDB
數據庫傳輸到outputCol
收集利用:
Configuration mongodbConfig = new Configuration();
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/testDB.inputCol");
JavaSparkContext sc = new JavaSparkContext(sparkClient.sparkContext);
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);
Configuration outputConfig = new Configuration();
outputConfig.set("mongo.output.format",
"com.mongodb.hadoop.MongoOutputFormat");
outputConfig.set("mongo.output.uri",
"mongodb://localhost:27017/testDB.outputCol");
documents.saveAsNewAPIHadoopFile(
"file:///this-is-completely-unused",
Object.class,
BSONObject.class,
MongoOutputFormat.class,
outputConfig
);
我要救一個簡單的文件說
{"_id":1, "name":"dev"}
在outputCol
收集testDB
數據庫。
我該如何做到這一點?