2016-11-03 68 views
0

我是新的使用Apache Flink。我從Apache Kafka源讀取數據,需要轉換DataStreamApache Flink - 錯誤:方法適用於參數不適用(WindowFunction)

在最後一步我嘗試應用WindowFunction

DataStream<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> dataStream = 
        env 
        .addSource(new FlinkKafkaConsumer08<>(
            parameterTool.getRequired("topic"), 
            new SimpleStringSchema(), 
            parameterTool.getProperties())) 
        .flatMap(new SplitIntoRecordsString()) 
        .flatMap(new SplitIntoTuples()) 
        .keyBy(1) 
        .countWindow(5) 
        .apply(new windowApplyFunction()); 

    public class windowApplyFunction implements WindowFunction< 
                  Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                  String, 
                  Double, 
                  Window>{ 

    public void apply(Double key, Window window, 
      Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values, 
      Collector<String> out) 
      throws Exception {  
     out.collect("MyResult"); 
    } 
} 

不幸的是我得到了下面的錯誤,不知道如何解決它:如果我

The method apply(WindowFunction<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,R,Tuple,GlobalWindow>) in the type WindowedStream<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,Tuple,GlobalWindow> is not applicable for the arguments (FlinkManager.windowApplyFunction) 

一切正常用預定義的函數替換apply(new windowApplyFunction()),例如sum(1)

回答

0

WindowFunction應該是

WindowFunction< 
    Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
    String, 
    Double, 
    GlobalWindow> 

countWindow()回報GlobalWindow類型類型。

試試看。

0

感謝您的提示vanekjar!糾正這個錯誤後,我改變了另一個小東西,它現在可以工作了! 正確的代碼:

public static class windowApplyFunction implements WindowFunction< 
                  Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                  Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                  Tuple, 
                  GlobalWindow>{ 

    public void apply(Tuple key, GlobalWindow window, 
      Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values, 
      Collector<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> out) 
      throws Exception {  
     out.collect(new Tuple8<Double, Double, String, Double, Double, Double, Double, Double>()); 
    } 
} 
相關問題