在我的程序中,我有一種方法返回一些RDD,我們稱之爲myMethod
,它採用不可序列化的參數,並讓RDD的類型爲Long
(我的真正的RDD是元組類型,但只包含原始類型)。奇怪的「任務不可序列化」與星火
當我嘗試這樣的事:
val x: NonSerializableThing = ...
val l: Long = ...
myMethod(x, l).map(res => res + l) // myMethod's RDD does NOT include the NonSerializableThing
我得到Task not serializable
。
當我用res + 1L
代替res + l
(即,某個常數)時,它運行。
從序列化跟蹤中,它試圖序列化NonSerializableThing
和扼流器,但是我重新檢查了我的方法,並且此對象從不出現在RDD中。
當我嘗試直接收集myMethod
輸出,即與
myMethod(x, l).take(1) foreach println
我也拿不出問題。
該方法使用NonSerializableThing
獲得上多個卡桑德拉查詢由值的(本地)序列(這是必要的,因爲我需要構造分區鍵來查詢),像這樣:
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val someParam1: String = x.someProperty
x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
val someParam2: String = y.someOtherProperty
y.someOtherSeq.map(someParam3: String =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
map(_.getLong(0))
}.reduce((a, b) => a.union(b))
}
的getSomeSeq
和someOtherSeq
迴歸平淡無火花Seq
小號
我想實現的是「聯盟」多卡珊德拉查詢。
這裏有什麼問題?
編輯,編,由傑姆·塔克的要求:
我有什麼在我的課是這樣的:
implicit class MySparkExtension(sc: SparkContext) {
def getThing(/* some parameters */): NonSerializableThing = { ... }
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val someParam1: String = x.someProperty
x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
val someParam2: String = y.someOtherProperty
y.someOtherSeq.map(someParam3: String =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
map(_.getLong(0))
}.reduce((a, b) => a.union(b))
}
}
這包對象被聲明。問題occurrs這裏:
// SparkContext is already declared as sc
import my.pkg.with.extension._
val thing = sc.getThing(/* parameters */)
val l = 42L
val rdd = sc.myMethod(thing, l)
// until now, everything is OK.
// The following still works:
rdd.take(5) foreach println
// The following causes the exception:
rdd.map(x => x >= l).take(5) foreach println
// While the following works:
rdd.map(x => x >= 42L).take(5) foreach println
我測試了進入「現場」成星火外殼以及在通過提交的算法。
我現在想嘗試(按我最後的評論)如下:
implicit class MySparkExtension(sc: SparkContext) {
def getThing(/* some parameters */): NonSerializableThing = { ... }
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val param1 = x.someProperty
val partitionKeys =
x.getSomeSeq.flatMap(y => {
val param2 = y.someOtherProperty
y.someOtherSeq.map(param3 => (param1, param2, param3, l)
}
queryTheDatabase(partitionKeys)
}
private def queryTheDatabase(partitionKeys: Seq[(String, String, String, Long)]): RDD[Long] = {
partitionKeys.map(k =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", k._1, k._2, k._3, k._4).
map(_.getLong(0))
).reduce((a, b) => a.union(b))
}
}
我相信這可能是工作,因爲RDD在方法queryTheDatabase
現在,這裏不存在NonSerializableThing
構建。
另一種選擇可能是:NonSerializableThing
確實是可序列化的,但我傳入SparkContext
作爲隱含的構造函數參數。我認爲如果我做這個暫時的,它會(無用)被序列化,但不會造成任何問題。
Plz post'mymethod'或至少是它的簽名。 –
我剛剛做到了。 – rabejens
我仍然看不到'def mymethod(...)...'。 2.你的客體在哪裏生活,他們的背景是什麼? –