2015-02-05 80 views
3

同樣的問題也適用於將RDD分成幾個新的RDD。Spark將一個DStream分成幾個RDD

DStream或RDD包含幾個不同的案例類,我需要根據案例類的類型將它們變成單獨的RDD。

我所知道的

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" } 

val newRDD = rdd.filter { 
    a => a match { 
    case _: CC1 => true 
    case _ => false 
    } 
} 

但是這需要通過原RDD,每箱類類型一個不少的運行。

  1. 必須有一個更簡潔的方法來做上述匹配過濾器?
  2. 有沒有辦法通過一個平行關卡通過元素類型將rdd分成幾個?

回答

1

看起來像rdd.filter我在長形式的正確軌道上。稍微更簡潔版本是:

val newRDD = rdd.filter { case _: CC1 => true ; case _ => false } 

你不能離開了case _ => false或測試類並不詳盡,你會得到錯誤。我無法讓收集器正常工作。

@maasg得到正確答案的信貸關於做單獨的過濾器通過,而不是黑客的方式來分裂輸入一次通過。

4

1)對於給定類型的過濾的更簡潔的方式是使用rdd.collect(PartialFunction[T,U])

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" } 

等效將是:

val newRDD = rdd.collect{case c:CaseClass1 => c} 

它甚至可以是結合額外的過濾和轉換:

val budgetRDD = rdd.collect{case c:CaseClass1 if (c.customer == "important") => c.getBudget} 

rdd.collect(p:PartialFunction[T,U])不應rdd.collect()它提供數據反饋給駕駛者混淆。


2)要分割RDD(或爲此事DSTREAM)filter是要走的路。必須記住,RDD是分佈式集合。過濾器可讓您將功能應用於該分佈式集合的一個子集,並行地通過集羣。

從原始RDD中產生2個或更多RDD的結構創建將產生1對多的混洗階段,這將會大大增加成本。

+0

好的,但我真的不想收藏嗎?我編輯了上面的代碼,將返回的過濾器分配給一個rdd。換句話說,我確實需要一個過濾器,對吧? – pferrel 2015-02-05 19:30:31

+0

我想這是因爲從閉包返回值比布爾值更容易? – pferrel 2015-02-05 19:48:33

+0

編輯了答案以表明該呼叫返回RDD。正如我所提到的,rdd.collect()和rdd。收集{case x => x.y}有兩個完全不同的目的。 – maasg 2015-02-05 19:50:37

相關問題