2016-11-09 55 views
2

假設我有一個抽象類A。我也有類BC從類A繼承。Spark Scala:將子類型傳遞給接受父類型的函數

abstract class A { 
    def x: Int 
} 
case class B(i: Int) extends A { 
    override def x = -i 
} 
case class C(i: Int) extends A { 
    override def x = i 
} 

鑑於這些類中,我們構建了以下RDD:

val data = sc.parallelize(Seq(
     Set(B(1), B(2)), 
     Set(B(1), B(3)), 
     Set(B(1), B(5)) 
    )).cache 
     .zipWithIndex 
     .map {case(k, v) => (v, k)} 

我還具有以下功能得到一個RDD作爲輸入,並返回每個元素的計數:

def f(data: RDD[(Long, Set[A])]) = { 
    data.flatMap({ 
    case (k, v) => v map { af => 
     (af, 1) 
    } 
    }).reduceByKey(_ + _) 
} 

請注意,RDD正在接受類型A。現在,我希望val x = f(data)返回預期的計數,作爲B是子類型的A,但我得到以下編譯錯誤:

type mismatch; 
found : org.apache.spark.rdd.RDD[(Long, scala.collection.immutable.Set[B])] 
required: org.apache.spark.rdd.RDD[(Long, Set[A])] 
    val x = f(data) 

這個錯誤消失,如果我改變函數簽名f(data: RDD[(Long, Set[B])]);但是,我不能這樣做,因爲我想在RDD中使用其他子類(如C)。

我也曾嘗試以下方法:

def f[T <: A](data: RDD[(Long, Set[T])]) = { 
    data.flatMap({ 
    case (k, v) => v map { af => 
     (af, 1) 
    } 
    }) reduceByKey(_ + _) 
} 

然而,這也給了我以下運行時錯誤:

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(T, Int)] 
possible cause: maybe a semicolon is missing before `value reduceByKey'? 
     }) reduceByKey(_ + _) 

我感謝有這方面的幫助。

+2

僅僅因爲B是A的子類型並不代表集[B]設置[A]的亞型。這是因爲'Set'是不變的。你需要確保你的集合是一個集合[A] – puhlen

回答

2

Set[T]T不變,這意味着給定亞型的BASet[A]不是亞型也不的Set[B] RDD[T]的超類型也不變上T進一步限制的選項,因爲,即使使用一個協變Collection[+T](例如一個List[+T])會出現相同的情況。

我們可以求助於替代方法的多態形式: 上面的版本中缺少的是Spark需要在擦除後保留類信息。

這應該工作:

import scala.reflect.{ClassTag} 
def f[T:ClassTag](data: RDD[(Long, Set[T])]) = { 
    data.flatMap({ 
    case (k, v) => v map { af => 
     (af, 1) 
    } 
    }) reduceByKey(_ + _) 
} 

讓我們來看看:

val intRdd = sparkContext.parallelize(Seq((1l, Set(1,2,3)), (2L, Set(4,5,6)))) 
val res1= f(intRdd).collect 
// Array[(Int, Int)] = Array((4,1), (1,1), (5,1), (6,1), (2,1), (3,1)) 

val strRdd = sparkContext.parallelize(Seq((1l, Set("a","b","c")), (2L, Set("d","e","f")))) 
val res2 = f(strRdd).collect 
// Array[(String, Int)] = Array((d,1), (e,1), (a,1), (b,1), (f,1), (c,1)) 
+0

當RDD只包含一個對象的實例(例如在你的例子中是int和string)時,這是完美的。但假設你有這樣的RDD:'val mixRdd = sc.parallelize(Seq((1l,Set(1,2,3)),(2L,Set(4,5,6)),(3L,Set 「a」,「b」))))'。在這種情況下,代碼失敗。 – Ashkan

+0

這就是b/c Scala推斷一個產品類型與'Set [_>:String with Int]'並且找不到ClassTag。如果你想將類型綁定到一個具體類型,那麼它會工作:'val mixRdd:RDD [(Long,Set [Any])] = sc.parallelize(Seq((1l,Set(1,2,3)) ,(2L,Set(4,5,6)),(3L,Set(「a」,「b」))))' – maasg

相關問題