2017-07-26 51 views
-1
輸入火花數據幀的

格式轉換以dat​​aframe1通過分割它

+-----+---------------+------------------------------------------------------------------------------------------------------------------+ 
|table| err_timestamp|     err_message                      | 
+-----+---------------+------------------------------------------------------------------------------------------------------------------+ 
| t1|7/26/2017 13:56|[error = RI_VIOLATION, field = user_id, value = 'null']               | 
| t2|7/26/2017 13:58|[error = NULL_CHECK, field = geo_id, value = 'null'] [error = DATATYPE_CHECK, field = emp_id, value = 'FIWOERE8'] | 
+-----+---------------+------------------------------------------------------------------------------------------------------------------+ 

到輸出dataframe2作爲整個列的轉置和列如下所示,創建從現有列新列。

+-----+--------------+---------+--------------+-----------+ 
|table|  err_date|err_field|  err_type| err_value| 
+-----+--------------+---------+--------------+-----------+ 
| t1|7/26/2017 0:00| user_id| RI_VOILATION|  null| 
| t2|7/26/2017 0:00| geo_id| NULL_CHECK|  null| 
| t2|7/26/2017 0:00| emp_id|DATATYPE_CHECK|FDSADFSDA68| 
+-----+--------------+---------+--------------+-----------+ 
+0

認真配合?你試圖問什麼? – ayush

+0

嗨,我想轉換dataframe1到dataframe2不使用火花SQL,請幫我 – prakash

+0

提供它是非常重要的莫我 – prakash

回答

1

下面是您需要的解決方案,您仍然可以最小化某些情況下的步驟。

import spark.implicits._ 

//create dummy data 
val df = spark.sparkContext.parallelize(Seq(
    ("t1", "7/26/2017 13:56", "[error = RI_VIOLATION, field = user_id, value = null]"), 
    ("t2", "7/26/2017 13:58", "[error = NULL_CHECK, field = geo_id, value = null] [error = DATATYPE_CHECK, field = emp_id, value = FIWOERE8]") 
)).toDF("table", "err_timestamp", "err_message") 

//create a udf to split string and create a array of string 
val splitValue = udf ((value : String) => { 
    "\\[(.*?)\\]".r.findAllMatchIn(value) 
    .map(x => x.toString().replaceAll("\\[", "").replaceAll("\\]", "")) 
    .toSeq 
}) 

//update a column with explode to arrays of string 
val df1 = df.withColumn("err_message", explode(splitValue($"err_message"))) 

df1.show(false) 
+-----+---------------+--------------------------------------------------------+ 
|table|err_timestamp |err_message            | 
+-----+---------------+--------------------------------------------------------+ 
|t1 |7/26/2017 13:56|error = RI_VIOLATION, field = user_id, value = null  | 
|t2 |7/26/2017 13:58|error = NULL_CHECK, field = geo_id, value = null  | 
|t2 |7/26/2017 13:58|error = DATATYPE_CHECK, field = emp_id, value = FIWOERE8| 
+-----+---------------+--------------------------------------------------------+ 

val splitExpr = split($"err_message", ",") 

//create a three new columns with splitting in key value 
df1.withColumn("err_field", split(splitExpr(1), "=")(1)) 
    .withColumn("err_type", split(splitExpr(0), "=")(1)) 
    .withColumn("err_value", split(splitExpr(2), "=")(1)) 
    .drop("err_message") 
    .show(false) 

輸出:

+-----+---------------+---------+---------------+---------+ 
|table|err_timestamp |err_field|err_type  |err_value| 
+-----+---------------+---------+---------------+---------+ 
|t1 |7/26/2017 13:56| user_id | RI_VIOLATION | null | 
|t2 |7/26/2017 13:58| geo_id | NULL_CHECK | null | 
|t2 |7/26/2017 13:58| emp_id | DATATYPE_CHECK| FIWOERE8| 
+-----+---------------+---------+---------------+---------+ 

希望這有助於!

+0

謝謝Shankar,很好的解決方案 – prakash

0

import spark.implicits._ 
 

 
//create dummy data 
 
val df = spark.sparkContext.parallelize(Seq(
 
    ("t1", "7/26/2017 13:56", "[error = RI_VIOLATION, field = user_id, value = null]"), 
 
    ("t2", "7/26/2017 13:58", "[error = NULL_CHECK, field = geo_id, value = null] [error = DATATYPE_CHECK, field = emp_id, value = FIWOERE8]") 
 
)).toDF("table", "err_timestamp", "err_message") 
 

 
//create a udf to split string and create a array of string 
 
val splitValue = udf ((value : String) => { 
 
    "\\[(.*?)\\]".r.findAllMatchIn(value) 
 
    .map(x => x.toString().replaceAll("\\[", "").replaceAll("\\]", "")) 
 
    .toSeq 
 
}) 
 

 
//update a column with explode to arrays of string 
 
val df1 = df.withColumn("err_message", explode(splitValue($"err_message"))) 
 

 
df1.show(false) 
 
+-----+---------------+--------------------------------------------------------+ 
 
|table|err_timestamp |err_message            | 
 
+-----+---------------+--------------------------------------------------------+ 
 
|t1 |7/26/2017 13:56|error = RI_VIOLATION, field = user_id, value = null  | 
 
|t2 |7/26/2017 13:58|error = NULL_CHECK, field = geo_id, value = null  | 
 
|t2 |7/26/2017 13:58|error = DATATYPE_CHECK, field = emp_id, value = FIWOERE8| 
 
+-----+---------------+--------------------------------------------------------+ 
 

 
val splitExpr = split($"err_message", ",") 
 

 
//create a three new columns with splitting in key value 
 
df1.withColumn("err_field", split(splitExpr(1), "=")(1)) 
 
    .withColumn("err_type", split(splitExpr(0), "=")(1)) 
 
    .withColumn("err_value", split(splitExpr(2), "=")(1)) 
 
    .drop("err_message") 
 
    .show(false)