-1
我是Spark和Scala的新手,現在我不知何故卡住了一個問題:如何通過字段名稱處理每行的不同字段,然後放入新的rdd
。Spark Rdd按字段名稱處理每一行的不同字段
這是我的僞代碼:
val newRdd = df.rdd.map(x=>{
def Random1 => random(1,10000) //pseudo
def Random2 => random(10000,20000) //pseduo
x.schema.map(y=> {
if (y.name == "XXX1")
x.getAs[y.dataType](y.name)) = Random1
else if (y.name == "XXX2")
x.getAs[y.dataType](y.name)) = Random2
else
x.getAs[y.dataType](y.name)) //pseduo,keeper the same
})
})
有上述2級少的錯誤:
- 第二張地圖, 「x.getAs」 是一個錯誤的語法
- 如何進入一個新的
rdd
我在網上搜索很長一段時間。但沒用。請幫助或嘗試提供一些想法如何實現這一點。
謝謝Ramesh Maharjan,它現在有效。
def randomString(len: Int): String = {
val rand = new scala.util.Random(System.nanoTime)
val sb = new StringBuilder(len)
val ab = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
for (i <- 0 until len) {
sb.append(ab(rand.nextInt(ab.length)))
}
sb.toString
}
def testUdf = udf((value: String) =>randomString(2))
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
df.withColumn("_2", testUdf(df("_2")))
+---+---+
| _1| _2|
+---+---+
| 1| F3|
| 2| Ag|
+---+---+
感謝您的快速回復做,但我希望得到一個新的RDD其中c更改特殊的RDD檔案和其他領域保持不變。 – meng
是的,我也認爲「udf」。但就我而言,每一個特殊的領域都是不同的。 withColumn會爲特殊的字段提供相同的內容。 – meng
'withColumn'會給出使用'udf函數'返回的結果。所以你可以爲'udf'中的不同字段「匹配個案」,以獲得不同的值。 –