2015-11-23 58 views
1

我希望消除Foo的歧義。他們中的一些需要分解成單獨的實例。每個需要一個獨特的,連續的Id。展開RDD並將連續ID分配給新元素

val maxId: Long = foos.map(_.id).max() 

foos.flatMap { foo => 
    if (foo.bar) List(foo, foo.copy(id = ???, ...)) 
    else List(foo) 
} 

在普通的Scala中,我會使用foldLeft。使用Spark,我能想到的最好的是flatmap到(Foo, Option[Long]),由_._2.isEmpty過濾,zipWithIndex並加入。有更聰明的方法嗎?

例如鑑於

case class Foo(id: Long) { 
    val bar: Boolean = id % 2 == 1 
} 

此輸入

RDD(Foo(1), Foo(2), Foo(3)) 

應該成爲

RDD(Foo(1), Foo(2), Foo(3), Foo(4), Foo(5)) 

因爲Foo(1)Foo(3)擴大,並就下一個可用的IDS(4 & 5)。

+0

也許別人可以理解你的問題是什麼,但我無法弄清楚。一般來說,在常規Scala集合上習慣的所有典型操作(特別是monad操作)都可以用於RDD。 – Phasmid

+0

啊,好的。我在問題中提出了一個簡潔的例子。 – Synesso

+0

@Synesso是否收集元素的順序很重要? – Odomontois

回答

1

在任何可以彼此獨立生產的分佈式系統標識中,性能優於順序生成器。

好的地方是.copy(id = randomLong),最好的辦法是.copy(id = UUID.randomUUID())

但問題是,具體的關於連續 IDS。我對這種情況下的建議是

import Numeric.Implicits._ 
import scala.reflect.ClassTag 

abstract class UpdateIDS[T: ClassTag, Id: Numeric : ClassTag] extends Serializable { 
    def getId(elem: T): Id 
    def setId(elem: T, id: Id): T 
    def shouldChange(elem: T): Boolean 
    val Id = implicitly[Numeric[Id]] 

    def apply(xs: RDD[T]): RDD[T] = { 
    val next = xs.map(getId).max + Id.one 
    val counts: Seq[(Int, Int)] = xs.mapPartitionsWithIndex { (idx, elems) => 
     Iterator.single(idx, elems.count(shouldChange)) 
    }.collect.view 
    val starts = counts.map(_._2).map(Id.fromInt).scanLeft(next)(_ + _) 
    val startMapLocal = counts.zip(starts).map { case ((idx, _), start) => (idx, start) }.toMap 
    val startMap = xs.context.broadcast(startMapLocal) 

    xs.mapPartitionsWithIndex { case (idx, elems) => 
     elems.scanLeft((List.empty[T], startMap.value(idx))) { (pair, elem) => 
     pair match { 
      case (_, counter) if shouldChange(elem) => (List(elem, setId(elem, counter)), counter + Id.one) 
      case (_, counter) => (List(elem), counter) 
     } 
     }.flatMap { _._1 } 
    } 
    } 
} 

用,你可以很容易地定義

object fooUpdateId extends UpdateIDS[Foo, Int] { 
    def getId(foo: Foo) = foo.id 
    def setId(foo: Foo, id: Int) = foo.copy(id = id) 
    def shouldChange(foo: Foo) = foo.id % 2 == 1 
} 

,然後運行

val foosUpdated = fooUpdateId(foos) 

重要的注意在這裏,產生收集的順序被改變爲更高性能解決方案。如果您需要在不太大的RDD中訂購,則可以使用sortBy

還注意到使用mapAccumLLens from scalaz可能會使UpdadeIDs的實現更簡單一些,但我選擇避免使用外部庫。

+0

我假設zipWithIndex是分區本地的,對不對? 因此,這裏的要點是計算每個分區要更改的數量並分配開始索引,將其廣播到集羣,然後每個分區使用它自己的開始索引來分配唯一的連續索引。 我會給它一個鏡頭。 – Synesso

+0

@Synesso確切地說,如果'zipWithIndex'指的是'mapPartitionsWithIndex' – Odomontois