我嘗試聯合一些flink數據集。它們包含在Seq中。產生這個問題下面是代碼apache flink的工會類型混淆?
case class clickZap (date: LocalDateTime, stbId:String, channelId :Int , nozap:Boolean)
val afterLastz: DataSet[clickZap]= ...
val ma_range: IndexedSeq[DataSet[(Int, Option[(java.time.LocalDateTime, String, Int, Boolean)])]] = for (i <- Range (0,min_n))
yield afterLastz.reduceGroup(it =>(i, maxBeforezTCZ(it,at plusMinutes(i))))
//val ma_all = ma_range.slice(1, min_n).foldLeft(ma_range.head)(_ union _)
val ma_all = ma_range.head union(ma_range.tail.head)
我得到的是一個
異常線程「main」 org.apache.flink.api.common.InvalidProgramException:不能工會 投入不同種類。 Input1 = scala.Tuple2(_1:Integer,_2: Option [scala.Tuple4(_1:GenericType [java.time.LocalDateTime],_2: String,_3:Integer,_4:Boolean)]),input2 = scala。 Tuple2(_1:整數, _2:選項[scala.Tuple4(_1:GenericType [java.time.LocalDateTime] _2:字符串,_3:整數,_4:布爾)])
我在想什麼?類型沒有不同,是嗎?工會運營商應該很便宜,因此避免這個問題似乎沒有吸引力。 我提供了前兩行代碼作爲DataSets中的數據類型相同的參數。 我用的flink版本是0.9.0和0.9.1
GenericType不一定相等。它實際上是什麼數據類型? –
你是否真的想對'Range(0,min_n)'中的每個元素執行全部縮減操作?爲''afterLastz'中的每個元素創建不同時間差的所有可能值,然後按時間差分組,然後計算每個組中的'maxBeforezTCZ',是不是更好?這樣你也可以避免'union'。無論如何,知道「afterLastz」的類型會有所幫助。 –
Mathias的數據類型參數是java.time.LocalDateTime。 –