2017-03-11 56 views
1

我試圖使用scalasparkList和另一個RDD生成RDD。這個想法是獲取一個值列表,並生成一個索引,其中包含包含每個值的原始數據集的所有條目。 下面是我試圖如何使用scala和spark將列表轉換爲RDD

def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = { 
    // filter function 
    def hasVal(foo: String)(bar: Int): Boolean = 
     foo.toInt == bar 
    // call to sc.parallelize to ensure that an RDD is returned 
    sc parallelize(
     foos map (_ match { 
     case (s: String) => (s, bars filter hasVal(s)) 
     }) 
    ) 
    } 

不幸的是這不sbt

> compile 
[info] Compiling 1 Scala source to $TARGETDIR/target/scala-2.11/classes... 
[error] $TARGETDIR/src/main/scala/wikipedia/WikipediaRanking.scala:56: type mismatch; 
[error] found : List[(String, org.apache.spark.rdd.RDD[Int])] 
[error] required: Seq[(String, Iterable[Int])] 
[error] Error occurred in an application involving default arguments. 
[error]  foos map (_ match { 
[error]   ^
[error] one error found 
[error] (compile:compileIncremental) Compilation failed 
[error] Total time: 1 s, completed Mar 11, 2017 7:11:31 PM 

我真的不明白,我得到了錯誤的編譯代碼。 ListSeq的一個子類,我假定RDDIterable的子類。有什麼明顯的我錯過了嗎?

回答

3

這裏是我的解決方案與-理解(應使用比笛卡爾乘積更少的內存)

def mcveInvertIndex(foos: List[String], 
         bars: RDD[Int]): RDD[(String, Iterable[Int])] = 
    { 

    // filter function 
    def hasVal(foo: String, bar: Int): Boolean = 
     foo.toInt == bar 

    // Producing RDD[(String, Iterable[Int])] 
    (for { 
     bar <- bars // it's important to have RDD 
        // at first position of for-comprehesion 
        // to produce the correct result type 
     foo <- foos 
     if hasVal(foo, bar) 
    } yield (foo, bar)).groupByKey() 
    } 
+1

應當指出的是,你的解決方案關閉了一個局部變量。所以網絡開銷會更高。謹防分佈式工作中關閉的危險。 –

+0

@JustinPihony,謝謝你的注意,所以應該是'aggregateByKey(Iterable.empty [Int])((is,i)=> i :: is.toList,_ ++ _)'而不是'groupByKey'來避免網絡開銷? –

+0

它比groupByKey更少,更多的是在RDD中使用foos –

1

正如在評論中提到的,RDD不是一個可迭代的,所以你必須以某種方式將這兩者合併,然後將它們聚合。這是我快速的解決方案,雖然有可能是一個更有效的方式:

def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = { 
    sc.makeRDD(foos) 
     .cartesian(bars) 
     .keyBy(x=>x._1) 
     .aggregateByKey(Iterable.empty[Int])(
     (agg: Iterable[Int], currVal: (String, Int))=>{ 
      if(currVal._1.toInt != currVal._2) agg 
      else currVal._2 +: agg.toList 
     }, 
     _ ++ _ 
    ) 
    } 
相關問題