2017-06-15 32 views
0

我正在使用PySpark進行一些大規模處理並將一些結果保存到MongoDB實例。我正在使用mongo-spark-connector_2.11-2.0.0.jar將數據框寫入MongoDB。如何處理PySpark中的MongoDB異常?

df.write.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection").mode("overwrite").save() 

有時候我例外,例如ConnectionExceptionMongoCommandException等。所以我想處理這些例外。所以我添加了這些異常處理代碼片段,但是我得到了ImportError: No module named com.mongodb

try: 
    df.write.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection").mode("overwrite").save() 
except MongoCommandException: 
    err_code = MongoCommandException.getErrorCode() 
    if err_code == int(16): 
     print "Request size is too large to write to Mongo" 

所以能有人對你幫助我如何使用mongo-spark-connector_2.11-2.0.0.jar

回答

1

來處理PySpark例外由於其中PySpark是利用了Java jar執行堆棧,你使用的是什麼/眼見實際上是Java圖書館。 這就是爲什麼你無法訪問PySpark中的com.mongodb庫。

你可以從py4j

from py4j.protocol import Py4JJavaError 

try: 
    df.write.format("com.mongodb.spark.sql.DefaultSource") 
      .option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection") 
      .mode("overwrite").save() 
except Py4JJavaError, ex: 
    print(ex.java_exception.toString()) 
    # analyse error stack and handle as needed. 

但是做什麼,捕捉異常見MongoDB Java MongoException class所有直接子類,以查看可用的例外處理。

+0

有趣。謝謝! –