2017-07-05 18 views
0

我有一個使用mongoexport從mongodb導出數據的過程。 由於documentation提到的所有JSON輸出是嚴格模式Mongoexport嚴格json在Spark中的加載

這意味着數據將是這樣的:

"{amount":{"$numberLong":"3"},"count":{"$numberLong":"245"}} 

凡爲我的Scala的情況下,類定義爲:

case class MongoData(amount: Long, count: Long) 

讀取數據當然會失敗,如下所示:

spark 
     .read 
     .json(inputPath) 
     .as[MongoData] 

有沒有辦法要麼從蒙戈出口沒有嚴格的模式或導入JSON Scala中沒有的每個字段手動重組到適當的結構?

回答

0

我現在正在使用它作爲解決方案。但感覺有點不好意思。

case class DataFrameExtended(dataFrame: DataFrame) { 

    def undoMongoStrict(): DataFrame = { 
    val numberLongType = StructType(List(StructField("$numberLong", StringType, true))) 

    def restructure(fields: Array[StructField], nesting: List[String] = Nil): List[Column] = { 
     fields.flatMap(field => { 
     val fieldPath = nesting :+ field.name 
     val fieldPathStr = fieldPath.mkString(".") 
     field.dataType match { 
      case dt: StructType if dt == numberLongType => 
      Some(col(s"$fieldPathStr.$$numberLong").cast(LongType).as(field.name)) 
      case dt: StructType => 
      Some(struct(restructure(dt.fields, fieldPath): _*).as(field.name)) 
      case _ => Some(col(fieldPathStr).as(field.name)) 
      //    case dt:ArrayType => //@todo handle other DataTypes Array?? 
     } 
     }) 
    }.toList 


    dataFrame.select(restructure(dataFrame.schema.fields): _*) 
    } 
} 

implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = { 
    DataFrameExtended(df) 
} 

spark 
    .read 
    .json(inputPath) 
    .undoMongoStrict() 
+1

'mongoexport'被設計成產生JSON可roundtripped。對此的要求是它不會丟失類型信息。鑑於輸入的性質,我認爲您的解決方案運作良好。 – Ross

+0

是否有其他的開源(cli)工具,允許從mongo出口沒有嚴格的選項? –

+0

然而,由於[DRIVERS-342](https://jira.mongodb.org/browse/DRIVERS-342)的結果,目前還沒有一個輕鬆風格的Json規範正在被批准。一旦設置,所有支持的驅動程序將更新並添加新的寬鬆設置。我不確定當前是否有服務器支持它的計劃。 – Ross