2016-12-09 34 views
5

我有一個嵌套的NDJ(新行分隔的JSON)文件,我需要讀入一個spark數據框並保存到實木複合地板。在試圖渲染架構我用這個函數:將龐大的JSON文件讀入Spark Dataframe

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { 
     schema.fields.flatMap(f => { 
      val colName = if (prefix == null) f.name else (prefix + "." + f.name) 
      f.dataType match { 
      case st: StructType => flattenSchema(st, colName) 
      case _ => Array(col(colName)) 
      } 
     }) 
    } 

上是由

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

讀返回數據幀我也切換這val df = spark.read.json(path)所以,這只是作品與NDJs,而不是多行JSON - 同樣的錯誤。

這是對工人 java.lang.OutOfMemoryError: Java heap space引起內存不足的錯誤。

我已經改變了JVM的內存選項和火花執行人/驅動器選項無濟於事

有沒有辦法來流的文件,扁平化架構,並添加到數據幀遞增? JSON的某些行包含來自前面條目的新字段......所以稍後需要填寫這些字段。

回答

0

您可以通過多種方式實現此目的。

首先在閱讀時,您可以提供數據框的架構來讀取json,或者您可以允許spark自己推斷架構。

一旦json在數據框中,您可以按照以下方法將其平坦化。

a。在數據框上使用explode() - 使其變平。 b。使用spark sql並使用訪問嵌套字段。運營商。您可以找到示例here

最後,如果要將新列添加到數據幀 a。第一個選項,使用withColumn()是一種方法。但是,這將針對每個添加的新列和整個數據集進行。 b。使用sql從現有生成新的數據框 - 這可能是最簡單的 c。最後,使用地圖,然後訪問元素,讓老模式,增加新的價值,創造新模式,並最終得到了新的DF - 如下

一個withColumn將在整個RDD工作。因此,對於要添加的每個列使用該方法通常不是一個好習慣。有一種方法可以處理地圖函數中的列及其數據。由於一個映射函數在這裏完成這項工作,所以添加新列及其數據的代碼將並行完成。

a。您可以根據計算收集新值。

b。添加這些新的列值到主RDD如下

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

這裏行,是行的地圖方法

c中的參考。如下創建新模式

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d。添加到舊模式

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e。用新的列創建新的數據幀

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+0

它如何解決由'wholeTextFiles'產生的'java.lang.OutOfMemoryError'? –

+0

我正在處理「是否有一種方法可以對文件進行流式處理,將模式平滑並逐步添加到數據框中?JSON的某些行包含來自前面條目的新字段......所以稍後需要填寫這些字段。 」。我沒有看到關於內存問題解決的問題。所以給了他多種方法。 – Ramzy

+0

如果NDJ是JSONL,那麼OP不應該使用wholeTextFiles。如果不是這不會有幫助。 –

2

沒有工作。這個問題與JVM對象限制有關。我最終使用了一個scala json解析器並手動構建了數據框。