我正在嘗試使用Scala的案例類(我想通過元組使用案例類,因爲我想按名稱連接字段)構建數據集。flink中的案例類序列化
下面是一個加入我工作的一個迭代:
case class TestTarget(tacticId: String, partnerId:Long)
campaignPartners.join(partnerInput).where(1).equalTo("id") {
(target, partnerInfo, out: Collector[TestTarget]) => {
partnerInfo.partner_pricing match {
case Some(pricing) =>
out.collect(TestTarget(target._1, partnerInfo.partner_id))
case None =>()
}
}
}
顯然,這將引發錯誤:
org.apache.flink.api.common.InvalidProgramException: Task not serializable at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171) at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:121) at org.apache.flink.api.scala.JoinDataSet$$anon$2.(joinDataSet.scala:108) at org.apache.flink.api.scala.JoinDataSet.apply(joinDataSet.scala:107) at com.adfin.dataimport.vendors.dbm.Job.calculateVendorFees(Job.scala:84)
我看到聲明,我需要實現序列化的文檔here爲班級。據我所知,在新版本的Scala中,沒有什麼好方法可以自動序列化案例類。 (我看着手動序列化,但我想我需要做一些額外的工作與鏈接這個工作)。
編輯: 根據Till Rohrmann的建議,我試圖用一個小例子重現這個錯誤。這是我用來嘗試和重現錯誤。這個例子工作,我沒有重現錯誤。我也嘗試將選項案例放在任何地方,但這也導致作業失敗。
val text = env.fromElements("To be, or not to be,--that is the question:--")
val words = text.flatMap { _.toLowerCase.split("\\W+") }.map(x => (1,x))
val nums = env.fromElements(List(1,2,3,4,5)).flatMap(x => x).map(x => First(1,x))
val counts = words.join(nums).where(0).equalTo("a") {
(a, b, out: Collector[TestTarget]) => {
b.b match {
case 2 =>()
case _ => out.collect(TestTarget(a._2, b.b))
}
}
}
你能提供一個完整的例子,再現你的問題? –
你還需要什麼?在我調用這個函數後,我使用writeAsText輸出它。 campaignPartners和PartnerInfo是數據集你想要他們的類型簽名? –
我用Flink'1.1-SNAPSHOT'測試了你的例子,它工作得很好。如果錯誤仍然適用於您,請您提供一個完整的示例,包括「First」類型以及您定義它們的位置(哪一個可以簡單複製和粘貼)。理想情況下,您只需發佈Scala文件。 –