弗林克0.10.0剛剛發佈最近。我有一些代碼需要從0.9.1遷移。但出現以下錯誤:弗林克InvalidTypesException:在的TypeVariable「類」「K」的類型不能確定
org.apache.flink.api.common.functions.InvalidTypesException:類'fi.aalto.dmg.frame.FlinkPairWorkloadOperator'中TypeVariable'K'的類型無法確定。這很可能是一種類型的刪除問題。只有在返回類型中的所有變量都可以從輸入類型推導出來的情況下,類型提取才支持具有通用變量的類型。
下面是代碼:
public class FlinkPairWorkloadOperator<K,V> implements PairWorkloadOperator<K,V> {
private DataStream<Tuple2<K, V>> dataStream;
public FlinkPairWorkloadOperator(DataStream<Tuple2<K, V>> dataStream1) {
this.dataStream = dataStream1;
}
public FlinkGroupedWorkloadOperator<K, V> groupByKey() {
KeyedStream<Tuple2<K, V>, K> keyedStream = this.dataStream.keyBy(new KeySelector<Tuple2<K, V>, K>() {
@Override
public K getKey(Tuple2<K, V> value) throws Exception {
return value._1();
}
});
return new FlinkGroupedWorkloadOperator<>(keyedStream);
}
}
要了解InvalidTypesException是如何發生的,我有拋出此異常也另一個例子,我有沒有關於它的想法。在這個演示中,該程序與scala.Tuple2一起使用,但不能鏈接Tuple2。
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter());
DataStream<Tuple2<String, Integer>> pairs = mapToPair(counts, mapToStringIntegerPair);
pairs.print();
env.execute("Socket Stream WordCount");
}
public static class Splitter implements FlatMapFunction<String, String> {
@Override
public void flatMap(String sentence, Collector<String> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(word);
}
}
}
public static <K,V,T> DataStream<Tuple2<K,V>> mapToPair(DataStream<T> dataStream , final MapPairFunction<T, K, V> fun){
return dataStream.map(new MapFunction<T, Tuple2<K, V>>() {
@Override
public Tuple2<K, V> map(T t) throws Exception {
return fun.mapPair(t);
}
});
}
public interface MapPairFunction<T, K, V> extends Serializable {
Tuple2<K,V> mapPair(T t);
}
public static MapPairFunction<String, String, Integer> mapToStringIntegerPair = new MapPairFunction<String, String, Integer>() {
public Tuple2<String, Integer> mapPair(String s) {
return new Tuple2<String, Integer>(s, 1);
}
};
}
你能張貼'FlinkPairWorkloadOperator'的完整代碼?在這個文件中https://github.com/wangyangjun/RealtimeStreamBenchmark/blob/master/StreamBench/flink/src/main/java/fi/aalto/dmg/frame/FlinkPairWorkloadOperator.java –
reduceByKey和groupByKey功能。目前我使用Object而不是K. –
我可以重現您的問題。將研究它。 –