2017-04-19 117 views

回答

0

更換NA,你可以嘗試以下更換NA 0,但會給你一個新的RDD

scala> val t= sc.parallelize(Seq(("100",List("2","-4","NA","6","8","2")))) 
t: org.apache.spark.rdd.RDD[(String, List[String])] = ParallelCollectionRDD[0] at parallelize at <console>:21 
scala> val newRDD = t.map(x => (x._1,x._2.map{case "NA" => 0; case x => x })) 
newRDD: org.apache.spark.rdd.RDD[(String, List[Any])] = MapPartitionsRDD[3] at map at <console>:23 

scala> newRDD.collect 
res5: Array[(String, List[Any])] = Array((100,List(2, -4, 0, 6, 8, 2))) 
0

當並行化序列時,spark會創建所提供值的RDD。 此RDD以火花形式存儲在羣集中。 RDD的本質是不可改變的,另一種方法可能是您可以過濾掉RDD 中的「NA」值,將它們映射到Int並將每個元素乘以零。 並將過濾後的RDD與RDD結合,包括非「NA」元素。

示例代碼

val t= sc.parallelize(Seq(("100",List("2","-4","NA","6","8","2")))) 
val a = t.map(i => i._2).filter(i => i.contains("NA")) 
val b = t.map(i => i._2).filter(i => !i.contains("NA")).map(i => (i*0)) 
val d = a.union(b) 
0

如果你婉只檢查是否列表的第三個指標,然後使用下面

val t = spark.sparkContext.parallelize(Seq(("100",List("2","-4","NA","6","8","2")))) 
    val updatedData = t.map(row => { 
     val list = row._2 
     val changed = if (list(2) == "NA") "0" else list(2) 
     (row._1, List(list(0),list(1),changed ,list(3),list(4),list(5))) 
    }) 

    updatedData.collect().foreach(println) 

提供的代碼,但我會建議你使用DataSet和返回一行從地圖。