這裏做
首先創建一個dataframe
import sqlContext.implicits._
val invalidrecords = Seq(
("Hello", 5),
("How", 3),
("World", 5)
).toDF("Data", "Count")
的一個簡單的方法,你應該有
+-----+-----+
|Data |Count|
+-----+-----+
|Hello|5 |
|How |3 |
|World|5 |
+-----+-----+
然後定義UDF功能
import org.apache.spark.sql.functions._
def appendDelimiterError = udf((data: String, count: Int) => "value with error")
你呼叫使用withColumn
作爲
invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)
你應該有輸出
+-----+-----+----------------+
|Data |Count|value |
+-----+-----+----------------+
|Hello|5 |value with error|
|How |3 |value with error|
|World|5 |value with error|
+-----+-----+----------------+
您可以從udf
功能
編輯寫你的邏輯,而不是返回一個字符串的
在下面的評論中回答您的要求將是必需的Ë您更改UDF功能和withColumn如下
def appendDelimiterError = udf((data: String, count: Int) => {
if(count < 5) s"convert value to ${data} - error"
else data
})
invalidrecords.withColumn("Data",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)
你應該有輸出
+----------------------------+-----+
|Data |Count|
+----------------------------+-----+
|Hello |5 |
|convert value to How - error|3 |
|World |5 |
+----------------------------+-----+
是的,它似乎 –
你需要確保你的UDF是類型的函數'是正確的方式(String,Int)=> String',例如。 –
只有你的列名是不同的 –