2015-11-19 69 views
2

弗林克0.10.0剛剛發佈最近。我有一些代碼需要從0.9.1遷移。但出現以下錯誤:弗林克InvalidTypesException:在的TypeVariable「類」「K」的類型不能確定

org.apache.flink.api.common.functions.InvalidTypesException:類'fi.aalto.dmg.frame.FlinkPairWorkloadOperator'中TypeVariable'K'的類型無法確定。這很可能是一種類型的刪除問題。只有在返回類型中的所有變量都可以從輸入類型推導出來的情況下,類型提取才支持具有通用變量的類型。

下面是代碼:

public class FlinkPairWorkloadOperator<K,V> implements PairWorkloadOperator<K,V> { 

    private DataStream<Tuple2<K, V>> dataStream; 

    public FlinkPairWorkloadOperator(DataStream<Tuple2<K, V>> dataStream1) { 
     this.dataStream = dataStream1; 
    } 



    public FlinkGroupedWorkloadOperator<K, V> groupByKey() { 
     KeyedStream<Tuple2<K, V>, K> keyedStream = this.dataStream.keyBy(new KeySelector<Tuple2<K, V>, K>() { 
      @Override 
      public K getKey(Tuple2<K, V> value) throws Exception { 
       return value._1(); 
      } 
     }); 
     return new FlinkGroupedWorkloadOperator<>(keyedStream); 
    } 
} 

要了解InvalidTypesException是如何發生的,我有拋出此異常也另一個例子,我有沒有關於它的想法。在這個演示中,該程序與scala.Tuple2一起使用,但不能鏈接Tuple2。

public class StreamingWordCount { 
    public static void main(String[] args) throws Exception { 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     DataStream<String> counts = env 
      .socketTextStream("localhost", 9999) 
      .flatMap(new Splitter()); 

     DataStream<Tuple2<String, Integer>> pairs = mapToPair(counts, mapToStringIntegerPair); 
     pairs.print(); 
     env.execute("Socket Stream WordCount"); 
    } 

    public static class Splitter implements FlatMapFunction<String, String> { 
     @Override 
     public void flatMap(String sentence, Collector<String> out) throws Exception { 
      for (String word: sentence.split(" ")) { 
       out.collect(word); 
      } 
     } 
    } 

    public static <K,V,T> DataStream<Tuple2<K,V>> mapToPair(DataStream<T> dataStream , final MapPairFunction<T, K, V> fun){ 
     return dataStream.map(new MapFunction<T, Tuple2<K, V>>() { 
      @Override 
      public Tuple2<K, V> map(T t) throws Exception { 
       return fun.mapPair(t); 
      } 
     }); 
    } 

    public interface MapPairFunction<T, K, V> extends Serializable { 
    Tuple2<K,V> mapPair(T t); 
    } 

    public static MapPairFunction<String, String, Integer> mapToStringIntegerPair = new MapPairFunction<String, String, Integer>() { 
     public Tuple2<String, Integer> mapPair(String s) { 
      return new Tuple2<String, Integer>(s, 1); 
     } 
    }; 
} 
+0

你能張貼'FlinkPairWorkloadOperator'的完整代碼?在這個文件中https://github.com/wangyangjun/RealtimeStreamBenchmark/blob/master/StreamBench/flink/src/main/java/fi/aalto/dmg/frame/FlinkPairWorkloadOperator.java –

+0

reduceByKey和groupByKey功能。目前我使用Object而不是K. –

+0

我可以重現您的問題。將研究它。 –

回答

1

的問題是,你在弗林克的Java API組合使用scala.Tuple2代替org.apache.flink.api.java.tuple.Tuple2。 Java API的TypeExtractor不理解Scala元組。因此,它不能提取類型變量K的類型。

如果您改用org.apache.flink.api.java.tuple.Tuple2,那麼TypeExtractor將能夠解析類型變量。

+0

感謝您的幫助。有沒有辦法解決這個問題,如果我使用scala.Tuple2?比如使用ResultTypeQueryable?因爲我有一個需要在Spark中實現的高級API,它需要scala.Tuple2。 –

+0

嗨,我剛剛更新了我的問題。我有一個與scala.Tuple2一起使用的演示,但是通過flink Tuple2獲得相同的例外。 –

+0

你爲什麼不簡單使用Flink的Scala API?有了這個,你可以輕鬆處理'scala.Tuples'。即使'ResultTypeQueryable'你會不會解決問題Java API中,因爲你必須要到什麼地方了'TypeInformation [K]'你不能從'GenericTypeInfo [scala.Tuple2]'獲得。你必須顯式傳遞一個'TypeInformation [K]'到'FlinkPairWorkloadOperator'中。 –