2015-11-17 86 views
0

我想聲明一個函數來得到兩個RDDcogroup。其實這是一個interSectionByKey。下面的代碼不能編譯:如何聲明函數來調用cogroup

def getRetain[K, V](activeUserRdd : RDD[(K, V)], newUserRdd : RDD[(K, V)]): RDD[(K, V)] ={ 
    activeUserRdd.cogroup(newUserRdd).flatMapValues{ 
     x => Option((if (!x._1.isEmpty && !x._2.isEmpty) x._2.head else null).asInstanceOf[V]) 
    } 
    } 

錯誤:

value cogroup is not a member of org.apache.spark.rdd.RDD[(K, V)] 

我覺得(K, V)匹配小姐在cogroup聲明的真正[(K, V)],但是這是在我的函數聲明的正確方法?

回答

0

ClassTag應用於您的輸入類型,以確保在運行時可以訪問已擦除類型KV。這是由於type erasure in Scala

scala> import scala.reflect.ClassTag 
import scala.reflect.ClassTag 

scala> def getRetain[K : ClassTag, V : ClassTag](activeUserRdd : RDD[(K, V)], newUserRdd : RDD[(K, V)]): RDD[(K, V)] ={ 
|  activeUserRdd.cogroup(newUserRdd).flatMapValues{ 
|   x => Option((if (!x._1.isEmpty && !x._2.isEmpty) x._2.head else null).asInstanceOf[V]) 
|  } 
|  } 
getRetain: [K, V](activeUserRdd: org.apache.spark.rdd.RDD[(K, V)], newUserRdd: org.apache.spark.rdd.RDD[(K, V)])(implicit evidence$1: scala.reflect.ClassTag[K], implicit evidence$2: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[(K, V)] 
+0

我沒有這個包scala.reflect.TypeTag,你知道一些其他的解決方案,只是使用原來的階。我認爲它可以幫助我理解scala。 編譯時發生錯誤。謝謝。 – yyforever1988

+0

對不起,我打錯輸入。請再試一次。 –

+0

它的工作,謝謝 – yyforever1988