2016-12-29 51 views
-2

我想用spark創建數據報告。 我想要做的概念如下。Spark,Scala - 從rdd映射輸出

case class output(txt: String) 
outputList: List[output] = .. 
myrdd 
    .filter(..) 
    .map( 
     some processing 
     outputList ::= output(..) 
    ) 

// this is why I cannot just union rdd with rdd 
anotherRdd.map(
    ...some processing... 
    val rdd = ..make rdd from rdd.. 
    rddinrdd.map( 
     ...some processing... 
     outputList ::= output(..) 
    ) 
) 

// save it as text 
..save outputList somehow.. 

我知道它不會因爲outputList工作將被存儲的所有輸出之前保存的,有沒有辦法做到這一點?

+0

很難理解的結果。你可以請教一下這段文字。你也可以舉一個輸入的例子和你想要達到的預期輸出嗎? – marios

+0

爲什麼在'anotherRdd'中''處理''處理後''兩個RDD' –

回答

1

你正在試圖做未在星火支持兩件事情:

  1. 變異駕駛員側的對象(outputList)的RDD改造內部
  2. 工作與RDD改造內部RDDS( rddinrdd不支持 - RDDS在駕駛員側的存在,他們的轉化都是在做執行者)

很難不建議更具體的要求的替代品,但一般來說,你應該將RDD轉換爲RDD [輸出]:這就是Spark的使用方式 - 不要嘗試構造outputList,嘗試通過轉換構建outputRDD

  • 對於第一RDD,看起來簡單 - 而不是增加output(..)到列表中,只是使該函數的返回值 - 那麼你map操作的結果將是一個RDD[output]

    val outRdd1: RDD[output] = myrdd 
    .filter(..) 
    .map( 
        some processing 
        output(..) 
    ) 
    
  • 對於第二個rdd,您可能需要加入某些鍵上的兩個RDD,假設「rdd中的..make rdd」使用anotherRdd中的當前記錄,因此一般情況下它看起來像這樣:

    val outRdd2: RDD[output] = anotherRdd 
        .keyBy(..extract join key..) 
        .join(myrdd.keyBy(..extract join key..)) 
        .map(
        ...some processing... 
        output(..) 
    ) 
    
  • 最後,您可以聯合所產生的RDDS和保存使用saveAsTextFile

相關問題