2015-10-16 38 views
0

在我的程序中,我有一種方法返回一些RDD,我們稱之爲myMethod,它採用不可序列化的參數,並讓RDD的類型爲Long(我的真正的RDD是元組類型,但只包含原始類型)。奇怪的「任務不可序列化」與星火

當我嘗試這樣的事:

val x: NonSerializableThing = ... 
val l: Long = ... 
myMethod(x, l).map(res => res + l) // myMethod's RDD does NOT include the NonSerializableThing 

我得到Task not serializable

當我用res + 1L代替res + l(即,某個常數)時,它運行。

從序列化跟蹤中,它試圖序列化NonSerializableThing和扼流器,但是我重新檢查了我的方法,並且此對象從不出現在RDD中。

當我嘗試直接收集myMethod輸出,即與

myMethod(x, l).take(1) foreach println 

我也拿不出問題。

該方法使用NonSerializableThing獲得上多個卡桑德拉查詢由值的(本地)序列(這是必要的,因爲我需要構造分區鍵來查詢),像這樣:

def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = { 
    val someParam1: String = x.someProperty 
    x.getSomeSeq.flatMap(y: OtherNonSerializableThing => { 
    val someParam2: String = y.someOtherProperty 
    y.someOtherSeq.map(someParam3: String => 
     sc.cassandraTable("fooKeyspace", "fooTable"). 
     select("foo"). 
     where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l). 
     map(_.getLong(0)) 
    }.reduce((a, b) => a.union(b)) 
} 

getSomeSeqsomeOtherSeq迴歸平淡無火花Seq小號

我想實現的是「聯盟」多卡珊德拉查詢。

這裏有什麼問題?

編輯,編,由傑姆·塔克的要求:

我有什麼在我的課是這樣的:

implicit class MySparkExtension(sc: SparkContext) { 

    def getThing(/* some parameters */): NonSerializableThing = { ... } 

    def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = { 
    val someParam1: String = x.someProperty 
    x.getSomeSeq.flatMap(y: OtherNonSerializableThing => { 
     val someParam2: String = y.someOtherProperty 
     y.someOtherSeq.map(someParam3: String => 
     sc.cassandraTable("fooKeyspace", "fooTable"). 
     select("foo"). 
     where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l). 
     map(_.getLong(0)) 
    }.reduce((a, b) => a.union(b)) 
    } 
} 

這包對象被聲明。問題occurrs這裏:

// SparkContext is already declared as sc 
import my.pkg.with.extension._ 

val thing = sc.getThing(/* parameters */) 
val l = 42L 
val rdd = sc.myMethod(thing, l) 
// until now, everything is OK. 
// The following still works: 
rdd.take(5) foreach println 
// The following causes the exception: 
rdd.map(x => x >= l).take(5) foreach println 
// While the following works: 
rdd.map(x => x >= 42L).take(5) foreach println 

我測試了進入「現場」成星火外殼以及在通過​​提交的算法。

我現在想嘗試(按我最後的評論)如下:

implicit class MySparkExtension(sc: SparkContext) { 

    def getThing(/* some parameters */): NonSerializableThing = { ... } 

    def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = { 
    val param1 = x.someProperty 
    val partitionKeys = 
     x.getSomeSeq.flatMap(y => { 
     val param2 = y.someOtherProperty 
     y.someOtherSeq.map(param3 => (param1, param2, param3, l) 
     } 
    queryTheDatabase(partitionKeys) 
    } 

    private def queryTheDatabase(partitionKeys: Seq[(String, String, String, Long)]): RDD[Long] = { 
    partitionKeys.map(k => 
     sc.cassandraTable("fooKeyspace", "fooTable"). 
     select("foo"). 
     where("bar=? and quux=? and baz=? and l=?", k._1, k._2, k._3, k._4). 
     map(_.getLong(0)) 
    ).reduce((a, b) => a.union(b)) 
    } 
} 

我相信這可能是工作,因爲RDD在方法queryTheDatabase現在,這裏不存在NonSerializableThing構建。

另一種選擇可能是:NonSerializableThing確實是可序列化的,但我傳入SparkContext作爲隱含的構造函數參數。我認爲如果我做這個暫時的,它會(無用)被序列化,但不會造成任何問題。

+0

Plz post'mymethod'或至少是它的簽名。 –

+0

我剛剛做到了。 – rabejens

+0

我仍然看不到'def mymethod(...)...'。 2.你的客體在哪裏生活,他們的背景是什麼? –

回答

1

當您將l替換爲1L Spark不再嘗試使用in中的方法/變量序列化類,因此不會拋出錯誤。

您應該能夠通過將val x: NonSerializableThing = ...標記爲瞬態即可修復。

@transient 
val x: NonSerializableThing = ... 

這意味着當類被序列化時,這個變量應該被忽略。

+0

當通過參數傳入時,是否也可以將'x:NonSerialiyableThing'聲明爲transient?或者,如果我使用'@transient val x1:NonSerializableThing = x'並從此使用'x1'就足夠了? – rabejens

+0

如果你的意思是作爲參數傳入類構造器,那麼是的。你能發佈包含此代碼的完整類def嗎? –

+0

不幸的是,我不能,因爲我不允許發佈公司代碼。我將在明天嘗試以下內容:構造包含所有分區鍵(僅包含字符串,長整型等)的'Seq',並將此(僅此)傳遞給在集羣上執行Cassandra查詢的私有方法。我認爲這可能是一個可行的解決方法,因爲在構建RDD時,範圍中不存在NonSerializableThing。 – rabejens