我試圖使用scala
和spark
從List
和另一個RDD
生成RDD
。這個想法是獲取一個值列表,並生成一個索引,其中包含包含每個值的原始數據集的所有條目。 下面是我試圖如何使用scala和spark將列表轉換爲RDD
def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = {
// filter function
def hasVal(foo: String)(bar: Int): Boolean =
foo.toInt == bar
// call to sc.parallelize to ensure that an RDD is returned
sc parallelize(
foos map (_ match {
case (s: String) => (s, bars filter hasVal(s))
})
)
}
不幸的是這不sbt
> compile
[info] Compiling 1 Scala source to $TARGETDIR/target/scala-2.11/classes...
[error] $TARGETDIR/src/main/scala/wikipedia/WikipediaRanking.scala:56: type mismatch;
[error] found : List[(String, org.apache.spark.rdd.RDD[Int])]
[error] required: Seq[(String, Iterable[Int])]
[error] Error occurred in an application involving default arguments.
[error] foos map (_ match {
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 1 s, completed Mar 11, 2017 7:11:31 PM
我真的不明白,我得到了錯誤的編譯代碼。 List
是Seq
的一個子類,我假定RDD
是Iterable
的子類。有什麼明顯的我錯過了嗎?
應當指出的是,你的解決方案關閉了一個局部變量。所以網絡開銷會更高。謹防分佈式工作中關閉的危險。 –
@JustinPihony,謝謝你的注意,所以應該是'aggregateByKey(Iterable.empty [Int])((is,i)=> i :: is.toList,_ ++ _)'而不是'groupByKey'來避免網絡開銷? –
它比groupByKey更少,更多的是在RDD中使用foos –