0
我正在爲某些分佈式處理工具構建基準測試工具,並且在Apache Flink中遇到了一些麻煩。使用Java使用Apache Flink降低Pojo字段
設置很簡單:LogPojo是一個帶有三個字段(長日期,雙值,字符串數據)的簡單Pojo。從列表中刪除我正在尋找具有最小「值」字段的LogPojo。基本上等同於:
pojoList.stream().min(new LogPojo.Comp()).get().getValue();
我弗林克的設置是這樣:
public double processLogs(List<LogPojo> logs) {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<LogPojo> logSet = env.fromCollection(logs);
double result = 0.0;
try {
ReduceOperator ro = logSet.reduce(new LogReducer());
List<LogPojo> c = ro.collect();
result = c.get(0).getValue();
} catch (Exception ex) {
System.out.println("Exception caught" + ex);
}
return result;
}
public class LogReducer implements ReduceFunction<LogPojo> {
@Override
public LogPojo reduce(LogPojo o1, LogPojo o2) {
return (o1.getValue() < o2.getValue()) ? o1 : o2;
}
}
它與停止:
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
所以不知何故,似乎是無法應用的減少功能。我只是找不到,爲什麼。任何提示?
謝謝你,對進口的提示解決它。看起來火花斯卡拉依賴項之一是肆虐。一旦我禁用了火花模塊,它就可以工作。 –