2015-09-23 16 views
0

我的代碼:如何在scala中壓縮以下內容?

val result= rdd.filter(x=> x.get[DateTime]("mytime") > offsetvalue._1 && 
      row.get[DateTime]("mytime") <= offsetvalue._2) 

我想壓縮像代碼:

val result = rdd.filter(x => myFunction()) 
where myFunction() {x=> x.get[DateTime]("mytime") > offsetvalue._1 && 
      row.get[DateTime]("mytime") <= offsetvalue._2 } 

時myFunction是調用它顯示exeception:

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) ~[spark-core_2.10-1.2.2.2.jar:1.2.2.2] 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) ~[spark-core_2.10-1.2.2.2.jar:1.2.2.2] 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1476) ~[spark-core_2.10-1.2.2.2.jar:1.2.2.2] 
at org.apache.spark.rdd.RDD.filter(RDD.scala:300) ~[spark-core_2.10-1.2.2.2.jar:1.2.2.2] 
at com.aruba.sparkjobs.apprf.LeaderBoardJob.runJob(LeaderBoardJob.scala:203) ~[ee507b50-011f-42de-8bd5-536ca113d640-2015-09-25T11:11:23.637+05:30.jar:1.0.0-b.3] 

如何序列化上面的函數

+0

您可以簡單地將myFun()傳遞給過濾器。 val result = rdd.filter(myFun()),但是您需要修改myFun(),類似於給出x的類型。 – curious

+0

最小化是什麼意思? –

+0

我同意在不同的函數中使用這樣的複雜表達式 - 它可以製作更多可讀的代碼,特別是在一段時間之後回到代碼中。但是,它需要比'myFun'更好的函數名稱! –

回答

2

類似於

def resultFilter(offsetValue: (A, A)) = (x: B) => { 
    val date = x.get[DateTime]("mytime") 
    date > offsetValue._1 && date <= offsetValue._2 
} 

rdd.filter(resultFilter(offsetValue)) 

你必須填寫AB,因爲你的問題中沒有足夠的信息來推斷它們。

+0

@spk這是你的問題。 'A'是'offsetValue'中的值的類型,而'B'是'RDD'中的元素的類型。 –

+0

按'let',你的意思是'val'? ('let'is haskel不是嗎?) – ccheneson

+0

@ccheneson哈哈woops,謝謝你的收穫。 –

1

這不是直接回答你的問題,但你可以讓你的表達更具可讀性這樣:

val (min, max) = offsetValue 
val result = rdd.map(_.get[DateTime]("mytime")) 
    .filter(t => t > min && t <= max) 

,這裏是一個直接回答你的問題:

def myFun(x: YourType): Boolean = { 
    val (min, max) = (dateTime1, dateTime2) // the values from offsetValue, assuming they are constant 
    val t = x.get[DateTime]("mytime") 
    t > min && t <= max 
} 

,然後你將其稱爲

val res = rdd.filter(myFun) 
+0

找不到可序列化的異常如何解決它 – sk1007

相關問題