2015-09-04 89 views
2

我嘗試聯合一些flink數據集。它們包含在Seq中。產生這個問題下面是代碼apache flink的工會類型混淆?

case class clickZap (date: LocalDateTime, stbId:String, channelId :Int , nozap:Boolean) 
val afterLastz: DataSet[clickZap]= ... 

val ma_range: IndexedSeq[DataSet[(Int, Option[(java.time.LocalDateTime, String, Int, Boolean)])]] = for (i <- Range (0,min_n)) 
     yield afterLastz.reduceGroup(it =>(i, maxBeforezTCZ(it,at plusMinutes(i)))) 
//val ma_all = ma_range.slice(1, min_n).foldLeft(ma_range.head)(_ union _) 
val ma_all = ma_range.head union(ma_range.tail.head) 

我得到的是一個

異常線程「main」 org.apache.flink.api.common.InvalidProgramException:不能工會 投入不同種類。 Input1 = scala.Tuple2(_1:Integer,_2: Option [scala.Tuple4(_1:GenericType [java.time.LocalDateTime],_2: String,_3:Integer,_4:Boolean)]),input2 = scala。 Tuple2(_1:整數, _2:選項[scala.Tuple4(_1:GenericType [java.time.LocalDateTime] _2:字符串,_3:整數,_4:布爾)])

我在想什麼?類型沒有不同,是嗎?工會運營商應該很便宜,因此避免這個問題似乎沒有吸引力。 我提供了前兩行代碼作爲DataSets中的數據類型相同的參數。 我用的flink版本是0.9.0和0.9.1

+0

GenericType不一定相等。它實際上是什麼數據類型? –

+0

你是否真的想對'Range(0,min_n)'中的每個元素執行全部縮減操作?爲''afterLastz'中的每個元素創建不同時間差的所有可能值,然後按時間差分組,然後計算每個組中的'maxBeforezTCZ',是不是更好?這樣你也可以避免'union'。無論如何,知道「afterLastz」的類型會有所幫助。 –

+0

Mathias的數據類型參數是java.time.LocalDateTime。 –

回答

2

這個問題是Flink自己的打字系統中的一個錯誤。表示斯卡拉OptionOptionTypeInfo沒有定義適當的方法equals。因此,兩個OptionTypeInfos未被檢測到相等。

我創建了一個JIRA issue並打開了一個Pull Request來解決這個問題。拉取請求應在兩天內合併。如果您然後使用最新的0.10-SNAPSHOT版本,那麼您的問題應該得到解決。

+0

這說明了這個問題。缺少equals()方法[反映在文檔中。](https://ci.apache.org/projects/flink/flink-docs-release-0.9/api/scala/index.html?org/apache /flink/api/scala/operators/IterativeDataSet.html#org.apache.flink.api.scala.typeutils.OptionTypeInfo)我的猜測是與Either/EitherTypeInfo有相同的問題。 –

+0

@SpyrosKomninos,你完全正確。感謝您捕捉它:-)我也會解決這個錯誤。 –