2017-08-15 80 views
0

我最近從Flink 1.2升級到Flink 1.3,我試圖更新我的ProcessFunction以使用1.3。我有我期待創建延伸的ProcessFunction類的功能,但它拋出一個編譯錯誤,說我不重寫processElementonTimer這裏是我的代碼有:無法擴展Flink ProcessFunction

class TimeoutStateFunction extends ProcessFunction[ObjectNode, (String, Long)] { 
    lazy val state : ListState[CountWithTimestamp] = getRuntimeContext 
     .getListState(new ListStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) 

    override def processElement(value: ObjectNode, ctx: Context, out: Collector[(String, Long)]): Unit = { 
     //Stuff here 
    } 

    override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = { 
     //More Stuff here 
    } 
} 

以下是編譯錯誤我越來越:

Error:(8, 7) class TimeoutStateFunction needs to be abstract, since method processElement in class ProcessFunction of type (x$1: com.fasterxml.jackson.databind.node.ObjectNode, x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#Context, x$3: org.apache.flink.util.Collector[(String, Long)])Unit is not defined 
class TimeoutStateFunction extends ProcessFunction[ObjectNode, (String, Long)] { 

Error:(17, 18) method processElement overrides nothing. 
Note: the super classes of class TimeoutStateFunction contain the following, non final members named processElement: 
def processElement(x$1: com.fasterxml.jackson.databind.node.ObjectNode,x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#Context,x$3: org.apache.flink.util.Collector[(String, Long)]): Unit 
    override def processElement(value: ObjectNode, ctx: Context, out: Collector[(String, Long)]): Unit = { 

Error:(36, 16) method onTimer overrides nothing. 
Note: the super classes of class TimeoutStateFunction contain the following, non final members named onTimer: 
def onTimer(x$1: Long,x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#OnTimerContext,x$3: org.apache.flink.util.Collector[(String, Long)]): Unit 
    override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = { 

我目前使用的Scala 2.11和1.3.2弗林克

回答

3

ContextOnTimerContext取決於ProcessFunction及其輸入和輸出類型。

所以這應該工作:

override def processElement(
    value: ObjectNode, 
    ctx: ProcessFunction[ObjectNode, (String, Long)]#Context, 
    out: Collector[(String, Long)]) 
    : Unit = { 
    //Stuff here 
} 

override def onTimer(
    timestamp: Long, 
    ctx: ProcessFunction[ObjectNode, (String, Long)]#OnTimerContext, 
    out: Collector[(String, Long)]) 
    : Unit = { 
    //More Stuff here 
} 
+0

偉大的工作,謝謝! – Eumcoz