2017-05-17 79 views
-1

我是Spark和Scala的新手,現在我不知何故卡住了一個問題:如何通過字段名稱處理每行的不同字段,然後放入新的rddSpark Rdd按字段名稱處理每一行的不同字段

這是我的僞代碼:

val newRdd = df.rdd.map(x=>{ 
     def Random1 => random(1,10000) //pseudo 
     def Random2 => random(10000,20000) //pseduo 
     x.schema.map(y=> { 
      if (y.name == "XXX1") 
      x.getAs[y.dataType](y.name)) = Random1 
      else if (y.name == "XXX2") 
      x.getAs[y.dataType](y.name)) = Random2 
      else 
      x.getAs[y.dataType](y.name)) //pseduo,keeper the same 
     }) 
     }) 

有上述2級少的錯誤:

  1. 第二張地圖, 「x.getAs」 是一個錯誤的語法
  2. 如何進入一個新的rdd

我在網上搜索很長一段時間。但沒用。請幫助或嘗試提供一些想法如何實現這一點。


謝謝Ramesh Maharjan,它現在有效。

def randomString(len: Int): String = { 
    val rand = new scala.util.Random(System.nanoTime) 
    val sb = new StringBuilder(len) 
    val ab = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 
    for (i <- 0 until len) { 
     sb.append(ab(rand.nextInt(ab.length))) 
    } 
    sb.toString 
    } 
def testUdf = udf((value: String) =>randomString(2)) 
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone"))) 
df.withColumn("_2", testUdf(df("_2"))) 
+---+---+ 
| _1| _2| 
+---+---+ 
| 1| F3| 
| 2| Ag| 
+---+---+ 

回答

1

如果您打算過濾某些felds「XXX1」「XXX2」那麼簡單select功能應該做的伎倆

,轉換至rdd
如果您打算別的東西,然後你的x.getAs應該看起來像下面

val random1 = x.getAs(y.name) 

看來,喲你企圖更改值某些列「XXX1」和「XXX2」
對於一個簡單的udf功能和withColumn應該做的伎倆
簡單udf功能如下

def testUdf = udf((value: String) => { 
    //do your logics here and what you return from here would be reflected in the value you passed from the column 
    }) 

而且你可以調用UDF功能

df.withColumn("XXX1", testUdf(df("XXX1"))) 

同樣可以爲「XXX2」

+0

感謝您的快速回復做,但我希望得到一個新的RDD其中c更改特殊的RDD檔案和其他領域保持不變。 – meng

+0

是的,我也認爲「udf」。但就我而言,每一個特殊的領域都是不同的。 withColumn會爲特殊的字段提供相同的內容。 – meng

+0

'withColumn'會給出使用'udf函數'返回的結果。所以你可以爲'udf'中的不同字段「匹配個案」,以獲得不同的值。 –