2015-10-16 40 views
2

我運行一個左連接在Spark RDD但有時我得到這樣的輸出:去除留下的一些加盟RDD火花

(k, (v, Some(w)))

(k, (v, None))

我該怎麼做才能讓它回到我的身邊

(k, (v, (w)))

(k, (v,()))

這裏是我如何合併2個文件..

def formatMap3(
    left: String = "", right: String = "")(m: String = "") = { 
    val items = m.map{k => { 
    s"$k"}} 
    s"$left$items$right" 
} 



val combPrdGrp = custPrdGrp3.leftOuterJoin(cmpgnPrdGrp3) 

val combPrdGrp2 = combPrdGrp.groupByKey 

val combPrdGrp3 = combPrdGrp2.map { case (n, list) => 
    val formattedPairs = list.map { case (a, b) => s"$a $b" } 
    s"$n ${formattedPairs.mkString}" 
} 

回答

1

如果你在得到格式化輸出,而不Somes/Nones只是有趣,那麼這樣的事情應該工作:

val combPrdGrp3 = combPrdGrp2.map { case (n, list) => 
    val formattedPairs = list.map { 
    case (a, Some(b)) => s"$a $b" 
    case (a, None) => s"$a,()" 
    } 
    s"$n ${formattedPairs.mkString}" 
} 

如果你心裏有其他用途,那麼你可能需要提供更多細節。

1

在火花leftOuterJoin()函數返回一個包含連接鍵,左側設置的值和一個元組Option正確的價值。要從Option類中提取出來,只需在合成的RDD中調用getOrElse()右邊的值即可。舉個例子:

scala> val rdd1 = sc.parallelize(Array(("k1", 4), ("k4", 7), ("k8", 10), ("k6", 1), ("k7", 4))) 
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:21 

scala> val rdd2 = sc.parallelize(Array(("k5", 4), ("k4", 3), ("k0", 2), ("k6", 5), ("k1", 6))) 
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:21 

scala> val rdd_join = rdd1.leftOuterJoin(rdd2).map { case (a, (b, c: Option[Int])) => (a, (b, (c.getOrElse()))) } 
rdd_join: org.apache.spark.rdd.RDD[(String, (Int, AnyVal))] = MapPartitionsRDD[18] at map at <console>:25' 

scala> rdd_join.take(5).foreach(println) 
... 
(k4,(7,3)) 
(k6,(1,5)) 
(k7,(4,())) 
(k8,(10,())) 
(k1,(4,6))