2017-02-22 17 views
0

在下面的代碼中,我嘗試使用選項映射在readConfig中傳遞mongo uri和數據庫。但它的錯誤,沒有找到uri或數據庫。如何在readConfig中爲Spark中的mongo db添加uri和數據庫?

`

public JavaMongoRDD<Document> getRDDFromDS(DataSourceInfo ds, String collectionName){ 
     String mongoDBURI = "mongodb://" 
       + PropertiesFileEncryptorUtil.decryptData(ds.getDbUsername()) + ":" 
       + PropertiesFileEncryptorUtil.decryptData(ds.getDbPassword()) + "@" 
       + ds.getHostName() + ":" + ds.getPort(); 
     Map<String, String> readOverrides = new HashMap<String, String>(); 
     readOverrides.put("uri", mongoDBURI); 
     readOverrides.put("database", ds.getDbName()); 
     readOverrides.put("collection", collectionName); 
     readOverrides.put("partitioner", mongoDBInputPartitioner); 
     readOverrides.put("partitionKey", mongoDBPartitionKey); 
     readOverrides.put("partitionSizeMB", mongoDBInputPartitionSize); 

     ReadConfig readConf = ReadConfig.create(jsc).withOptions(readOverrides); 
     JavaMongoRDD<Document> readRdd = MongoSpark.load(jsc, readConf); 
     return readRdd; 
    }` 

什麼是通過URI和數據庫的正確途徑。 在此先感謝。

+0

什麼版本的火花連接器您使用的是有幫助嗎?我認爲這個bug已經修復了。您也可以直接使用選項創建readConf。 – Ross

+0

我正在使用'org.mongodb.spark:mongo-spark-connector_2.11:2.0.0-rc1',我需要更改這個,因爲我正在使用gradle,並在maven中找到了上面最新的一個上次爲100 MB大小的排序問題進行了修正。現在我查了一下,發現2.0.0是最新的一個,我用2.0.0試着讓你知道。 –

+0

@Ross,感謝您的快速響應,我正在使用2.0.0版本進行測試,請您幫我解釋如何在傳遞讀取配置的同時爲mongo客戶端傳遞ssl標誌。 –

回答

0

你可以通過配置參數通過配置變量火花:

val conf = new SparkConf().setAppName("YourAppName").setMaster("local[2]").set("spark.executor.memory","1g") 
     .set("spark.app.id","YourSparkId") 
     .set("spark.mongodb.input.uri","mongodb://127.0.0.1/yourdatabase.yourInputcollection?readPreference=primaryPreferred") 
     .set("spark.mongodb.output.uri","mongodb://127.0.0.1/yourdatabase.yourOutputcollection") 

你需要給配置變量火花上下文後認爲:

val sc = new SparkContext(conf) 

val readConf = ReadConfig(sc) 

然後你就可以讀出值mongo是這樣的:

val rdd = sc.loadFromMongoDB(readConfig = readConfig) 

並保存這樣:

rdd.map(someMapFunction).saveToMongoDB() 

我希望我的回答是

+0

感謝您的回覆。我和你在上一段代碼中提出的一樣。但是現在我們需要動態選擇託管在不同主機上的不同數據源,因此我需要在創建sparkcontext時將uri和數據庫作爲讀取配置傳遞給SparkConf,而不是傳遞給SparkConf。由於在一個應用程序設置中僅推薦單個sparkcontext,因此sparkcontext初始化中的uri似乎不適用於此用例。 –

+0

是否可以在sparkcontext初始化中提供uri和數據庫,然後通過readconfig覆蓋uri和數據庫。 –

+0

我不知道你的第二個問題,但如果我的第一個答案滿足你的問題,請接受/ upvote它。 –

相關問題