2016-03-16 59 views
0

我正在嘗試使用Scala的案例類(我想通過元組使用案例類,因爲我想按名稱連接字段)構建數據集。flink中的案例類序列化

下面是一個加入我工作的一個迭代:

case class TestTarget(tacticId: String, partnerId:Long) 

campaignPartners.join(partnerInput).where(1).equalTo("id") { 
    (target, partnerInfo, out: Collector[TestTarget]) => { 
     partnerInfo.partner_pricing match { 
      case Some(pricing) => 
      out.collect(TestTarget(target._1, partnerInfo.partner_id)) 
      case None =>() 
    } 
    } 
} 

顯然,這將引發錯誤:

org.apache.flink.api.common.InvalidProgramException: Task not serializable at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171) at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:121) at org.apache.flink.api.scala.JoinDataSet$$anon$2.(joinDataSet.scala:108) at org.apache.flink.api.scala.JoinDataSet.apply(joinDataSet.scala:107) at com.adfin.dataimport.vendors.dbm.Job.calculateVendorFees(Job.scala:84)

我看到聲明,我需要實現序列化的文檔here爲班級。據我所知,在新版本的Scala中,沒有什麼好方法可以自動序列化案例類。 (我看着手動序列化,但我想我需要做一些額外的工作與鏈接這個工作)。

編輯: 根據Till Rohrmann的建議,我試圖用一個小例子重現這個錯誤。這是我用來嘗試和重現錯誤。這個例子工作,我沒有重現錯誤。我也嘗試將選項案例放在任何地方,但這也導致作業失敗。

val text = env.fromElements("To be, or not to be,--that is the question:--") 

val words = text.flatMap { _.toLowerCase.split("\\W+") }.map(x => (1,x)) 

val nums = env.fromElements(List(1,2,3,4,5)).flatMap(x => x).map(x => First(1,x)) 



val counts = words.join(nums).where(0).equalTo("a") { 
    (a, b, out: Collector[TestTarget]) => { 
    b.b match { 
     case 2 =>() 
     case _ => out.collect(TestTarget(a._2, b.b)) 
    } 
    } 
} 
+0

你能提供一個完整的例子,再現你的問題? –

+0

你還需要什麼?在我調用這個函數後,我使用writeAsText輸出它。 campaignPartners和PartnerInfo是數據集你想要他們的類型簽名? –

+0

我用Flink'1.1-SNAPSHOT'測試了你的例子,它工作得很好。如果錯誤仍然適用於您,請您提供一個完整的示例,包括「First」類型以及您定義它們的位置(哪一個可以簡單複製和粘貼)。理想情況下,您只需發佈Scala文件。 –

回答

0

我的程序的定義中使用的一類

class Job(conf: AdfinConfig)(implicit env: ExecutionEnvironment) 
     extends DspJob(conf){ 
    ... 
    case class TestTarget(tacticId: String, partnerId:Long) 
    campaignPartners.join(partnerInput).where(1).equalTo("id") { 
    ... 
} 

由於這是一個內部類,它不會自動如果您切換類不是一個內部類連載

那麼一切正常

case class TestTarget(tacticId: String, partnerId:Long) 
class Job(conf: AdfinConfig)(implicit env: ExecutionEnvironment) 
     extends DspJob(conf){ 
    ... 
    words.join(....) 
    ... 
}