2016-10-10 137 views
1

我試圖用PyMongo連接器保存一個Spark-DataFrame。 以下是我的代碼,但每一次我運行代碼我得到一個錯誤:使用Spark-DataFrame保存HDFS到MongoDB

java.io.IOException: No FileSystem for scheme: mongodb 

下面是我的代碼:

import pymongo 
import pymongo_spark 
pymongo_spark.activate() 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc) 
from pyspark.sql import SparkSession 
from pyspark.sql import SparkSession 
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv" 
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path) 
collections=df.collect() 
df.write.format('mongodb://localhost:27017/test.sales_order_2').save() 

我有一個很天真的代碼,因爲我是個新手,這,但對此的任何幫助將不勝感激。即時通訊使用火花2.0.0,2.7.6的Python,MongoDB的:3.2.9

回答

1

I'm trying to save a Spark-DataFrame using PyMongo connector

你可以嘗試使用MongoDB Connector for Spark。使用Apache Spark v2.0.x,Python的v2.7.x和MongoDB爲V3.2.x您的安裝環境中,你可以做一些象下面這樣:

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Application Name").getOrCreate() 
dataframe = spark.read.csv("path/to/file.csv", header=True, mode="DROPMALFORMED") 
dataframe.write.format("com.mongodb.spark.sql.DefaultSource")\ 
       .option("spark.mongodb.output.uri", "mongodb://localhost:27017/database.collection")\ 
       .save() 

Python示例文件的完整版本可以在MongoDB PySpark Docker: examples.py找到。其中包括一個在Spark中使用MongoDB Aggregation的示例,以及Spark SQL

如果您熟悉docker,您可以使用docker-compose執行git項目MongoDB PySpark Docker並運行一些PySpark示例。

您可能會發現下面的資源非常有用:

+0

這是一個很好的解決方案。但是,我們可以在PySpark中使用此Spark連接器執行異常處理嗎?因爲,有可能數據框可以輕鬆地超過MongoDB的文檔大小限制16MB –

+0

您可以隨時將其放在'try/except'語句中。請注意,CSV行將是單個文檔,而不是整個CSV成爲單個文檔。另請參閱[MongoDB文檔]的定義(https://docs.mongodb.com/manual/core/document/)。如果CSV行值超過16MB,則可能需要重新考慮架構/模型。 –

相關問題