2017-01-11 43 views
3

有沒有簡單的方法將給定的Row對象轉換爲json?如何將行轉換爲Spark 2中的json Scala

發現這個約一整個數據幀轉換成JSON輸出: Spark Row to JSON

但我只是想將一個行轉換成JSON。 這是我正在嘗試做的僞代碼。

更確切地說,我正在讀取json作爲Dataframe中的輸入。 我正在生成一個主要基於列的新輸出,但是對於不適合列的所有信息使用一個json字段。

我的問題是什麼來寫這一功能的最簡單的方法:convertRowToJson()

def convertRowToJson(row: Row): String = ??? 

def transformVenueTry(row: Row): Try[Venue] = { 
    Try({ 
    val name = row.getString(row.fieldIndex("name")) 
    val metadataRow = row.getStruct(row.fieldIndex("meta")) 
    val score: Double = calcScore(row) 
    val combinedRow: Row = metadataRow ++ ("score" -> score) 
    val jsonString: String = convertRowToJson(combinedRow) 
    Venue(name = name, json = jsonString) 
    }) 
} 

Psidom的解決方案:

def convertRowToJSON(row: Row): String = { 
    val m = row.getValuesMap(row.schema.fieldNames) 
    JSONObject(m).toString() 
} 

只能如果該行只有不帶嵌套行的一個級別。這是模式:

StructType(
    StructField(indicator,StringType,true), 
    StructField(range, 
    StructType(
     StructField(currency_code,StringType,true), 
     StructField(maxrate,LongType,true), 
     StructField(minrate,LongType,true)),true)) 

也試過阿爾喬姆的想法,但是沒有編譯:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = { 
    val sparkContext = sqlContext.sparkContext 
    import sparkContext._ 
    import sqlContext.implicits._ 
    import sqlContext._ 
    val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil) 
    val dataFrame = rowRDD.toDF() //XXX does not compile 
    dataFrame 
} 

回答

1

我需要閱讀json輸入並生成json輸出。 大多數字段都是單獨處理的,但只需保留一些json子對象。

當Spark讀取一個數據幀時,它將一條記錄變成一個行。 Row是一個類似json的結構。這可以轉化成json。

但我需要採取一些子json結構出來的字符串用作一個新的領域。

這可以是這樣的:

dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address")) 

location.address是路徑到傳入的JSON基於數據幀的子JSON對象。 address_json是將該對象的列名轉換爲json的字符串版本。

to_json是Spark 2.1實現。

如果生成使用json4s address_json它輸出JSON應該解釋爲一個AST表示否則輸出JSON將具有address_json部分逃出。

1

本質上講,你可以有一個包含只是一排數據幀。因此,您可以嘗試過濾您的初始數據框,然後將其解析爲json。

+0

感謝您的建議。我想你的做法: 高清row2DataFrame(行:行,sqlContext:SQLContext):數據幀= { VAL sparkContext = sqlContext.sparkContext 進口sparkContext._ 進口sqlContext.implicits._ 進口sqlContext._ VAL rowRDD: RDD [行] = sqlContext.sparkContext.makeRDD(行::無) VAL數據幀= rowRDD.toDF()// XXX不編譯 數據幀 } 它沒有編譯。 –

5

您可以使用getValuesMap到行對象轉換爲一個Map然後將其轉換JSON:

import scala.util.parsing.json.JSONObject 
import org.apache.spark.sql._ 

val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")  
val row = df.first()   // this is an example row object 

def convertRowToJSON(row: Row): String = { 
    val m = row.getValuesMap(row.schema.fieldNames) 
    JSONObject(m).toString() 
} 

convertRowToJSON(row) 
// res46: String = {"A" : 1, "B" : 2, "C" : 3} 
+0

這很好。謝謝! –

+2

更正: 它實際上只用於地圖/結構的第一級工作,而不是嵌套的地圖,你將只能看到值沒有鑰匙。 –

+1

@SamiBadawi在哪裏可以找到嵌套Map的解決方案? –

1

JSON有模式,但行沒有一個模式,所以你需要對行&應用架構轉換爲JSon。這是你如何做到的。

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._ 

def convertRowToJson(row: Row): String = { 

    val schema = StructType(
     StructField("name", StringType, true) :: 
     StructField("meta", StringType, false) :: Nil) 

     return sqlContext.applySchema(row, schema).toJSON 
} 
0

我結合了Artem,KiranM和Psidom的建議。做了很多創新和錯誤以及與此解決方案,我嵌套結構測試想出了:

def row2Json(row: Row, sqlContext: SQLContext): String = { 
    import sqlContext.implicits 
    val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil) 
    val dataframe = sqlContext.createDataFrame(rowRDD, row.schema) 
    dataframe.toJSON.first 
} 

該解決方案的工作,但只有同時驅動模式下運行。

1

注重斯卡拉類scala.util.parsing.json.JSONObject已被棄用,不支持空值。

@deprecated( 「本課程將被刪除。」, 「2.11.0」)

「JSONFormat.defaultFormat不處理空值」

https://issues.scala-lang.org/browse/SI-5092

+0

謝謝阿農。 關於現代化Scala中的json支持已經有一些討論。 –

0

我有同樣的問題,我用標準模式(無數組)實現了parquet文件,而我只想獲取json事件。我做了如下,它似乎工作得很好(火花2.1):

import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.{DataFrame, Dataset, Row} 
import scala.util.parsing.json.JSONFormat.ValueFormatter 
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject} 

def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = { 
    schema.fields.map { 
    field => 
     try{ 
     if (field.dataType.typeName.equals("struct")){ 
      field.name -> getValuesMap(row.getAs[Row](field.name), field.dataType.asInstanceOf[StructType]) 
     }else{ 
      field.name -> row.getAs[T](field.name) 
     } 
     }catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}} 
    }.filter(xy => xy._2 != null).toMap 
} 

def convertRowToJSON(row: Row, schema: StructType): JSONObject = { 
    val m: Map[String, Any] = getValuesMap(row, schema) 
    JSONObject(m) 
} 
//I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter) 
val defaultFormatter : ValueFormatter = (x : Any) => x match { 
    case s : String => "\"" + JSONFormat.quoteString(s) + "\"" 
    case jo : JSONObject => jo.toString(defaultFormatter) 
    case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter) 
    case ja : JSONArray => ja.toString(defaultFormatter) 
    case other => other.toString 
} 

val someFile = "s3a://bucket/file" 
val df: DataFrame = sqlContext.read.load(someFile) 
val schema: StructType = df.schema 
val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema)) 
相關問題