2017-03-07 54 views
0

我試圖將JSON文件轉換爲CSV拼合,但我沒有在這方面取得成功。我試圖讓這個輸出Spark 2.0 - 將JSON文件拼合爲CSV

OUTPUT: enter image description here

我試試這個代碼,但我不知道如何正確manipule火花SQL的qualify列,並返回正確的值。

from pyspark.sql.functions import * 

dummy = spark.read.json('dummy-3.json') 
qualify = dummy.select("user_id", "rec_id", "uut", "hash", explode("qualify").alias("qualify")) 
qualify.show() 

+-------+------+---+------+--------------------+ 
|user_id|rec_id|uut| hash|    qualify| 
+-------+------+---+------+--------------------+ 
|  1|  2| 12|abc123|[cab321,test-1,of...| 
|  1|  2| 12|abc123|[cab123,test-2,of...| 
+-------+------+---+------+--------------------+ 

JSON例如:

{ 
    "user_id": 1, 
    "rec_id": 2, 
    "uut": 12, 
    "hash": "abc123" 
    "qualify":[{ 
    "offer": "offer-1", 
    "name": "test-1", 
    "hash": "cab321", 
    "qualified": false" 
    "rules": [{ 
     "name": "name of rule 1", 
     "approved": true, 
     "details": {} 
    }, 
    { 
    "name": "name of rule 2", 
    "approved": false, 
    "details": {} 
    }] 
    },{ 
    "offer": "offer-2", 
    "name": "test-2", 
    "hash": "cab123", 
    "qualified": true 
    "rules": [{ 
     "name": "name of rule 1", 
     "approved": true, 
     "details": {} 
    }, 
    { 
    "name": "name of rule 2", 
    "approved": false, 
    "details": {} 
    }] 
    } 
} 

JSON模式:

root 
|-- hash: string (nullable = true) 
|-- qualify: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- hash: string (nullable = true) 
| | |-- name: string (nullable = true) 
| | |-- offer: string (nullable = true) 
| | |-- qualified: boolean (nullable = true) 
| | |-- rules: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- approved: boolean (nullable = true) 
| | | | |-- name: string (nullable = true) 
|-- rec_id: long (nullable = true) 
|-- user_id: long (nullable = true) 
|-- uut: long (nullable = true) 

我tryed變換數據框爲RDD並創建一個映射函數返回值,但出於某種原因,我認爲這不是一個好方法。我錯了?

有沒有人在類似的問題工作?

感謝您的任何幫助。

+0

您是否試圖將'qualified。*'放入您的選擇查詢中而不是'explode'? – Zyoma

回答

1
qualify = dummy.withColumn('qualify',f.explode(dummy['qualify'])) 
result = qualify.withColumn('qualify_name', qualify['qualify']['name']) 

您可以輸入StructType()通過a.ba['b']

0

SOLUTION

我用了explode功能,而是創建一個新的數據幀,每爆炸。

df2 = df.select(col("userId").alias("user_id"),\ 
       col("recommendationId").alias("rec_id"),\ 
       col("utsId").alias("uts_id"),\ 
       col("gitHash").alias("git_hash"), \ 
       from_unixtime(col("createdAt")).alias("created"), \ 
       explode("qualifyResults").alias("qualify")) 

df3 = df2.select("user_id",\ 
       "rec_id",\ 
       "uts_id",\ 
       "git_hash",\ 
       "created",\ 
       col("qualify.offerId").alias("qualify_offer"),\ 
       col("qualify.qualifyName").alias("qualify_name"),\ 
       col("qualify.qualifyHash").alias("qualify_hash"),\ 
       col("qualify.qualified").alias("qualify_qualified"),\ 
       explode("qualify.rulesResult").alias("rules")) 

#removi os details ate 
df4 = df3.select("user_id",\ 
       "rec_id",\ 
       "uts_id",\ 
       "git_hash",\ 
       "created",\ 
       "qualify_offer",\ 
       "qualify_name",\ 
       "qualify_hash",\ 
       "qualify_qualified",\ 
       col("rules.name").alias("rule_name"),\ 
       col("rules.approved").alias("rule_approved"),\ 
       col("rules.details").alias("rule_details")) 

使用這種方法我能夠得到我想要的CSV表單。