我試圖用PyMongo連接器保存一個Spark-DataFrame。 以下是我的代碼,但每一次我運行代碼我得到一個錯誤:使用Spark-DataFrame保存HDFS到MongoDB
java.io.IOException: No FileSystem for scheme: mongodb
下面是我的代碼:
import pymongo
import pymongo_spark
pymongo_spark.activate()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv"
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
collections=df.collect()
df.write.format('mongodb://localhost:27017/test.sales_order_2').save()
我有一個很天真的代碼,因爲我是個新手,這,但對此的任何幫助將不勝感激。即時通訊使用火花2.0.0,2.7.6的Python,MongoDB的:3.2.9
這是一個很好的解決方案。但是,我們可以在PySpark中使用此Spark連接器執行異常處理嗎?因爲,有可能數據框可以輕鬆地超過MongoDB的文檔大小限制16MB –
您可以隨時將其放在'try/except'語句中。請注意,CSV行將是單個文檔,而不是整個CSV成爲單個文檔。另請參閱[MongoDB文檔]的定義(https://docs.mongodb.com/manual/core/document/)。如果CSV行值超過16MB,則可能需要重新考慮架構/模型。 –