2017-03-09 41 views
0

我正在爲某些分佈式處理工具構建基準測試工具,並且在Apache Flink中遇到了一些麻煩。使用Java使用Apache Flink降低Pojo字段

設置很簡單:LogPojo是一個帶有三個字段(長日期,雙值,字符串數據)的簡單Pojo。從列表中刪除我正在尋找具有最小「值」字段的LogPojo。基本上等同於:

pojoList.stream().min(new LogPojo.Comp()).get().getValue(); 

我弗林克的設置是這樣:

public double processLogs(List<LogPojo> logs) { 

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

    DataSet<LogPojo> logSet = env.fromCollection(logs); 

    double result = 0.0; 
    try { 
     ReduceOperator ro = logSet.reduce(new LogReducer()); 
     List<LogPojo> c = ro.collect(); 
     result = c.get(0).getValue(); 
    } catch (Exception ex) { 
     System.out.println("Exception caught" + ex); 
    } 

    return result; 
} 

public class LogReducer implements ReduceFunction<LogPojo> { 

    @Override 
    public LogPojo reduce(LogPojo o1, LogPojo o2) { 
     return (o1.getValue() < o2.getValue()) ? o1 : o2; 
    } 
} 

它與停止:

Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet; 

所以不知何故,似乎是無法應用的減少功能。我只是找不到,爲什麼。任何提示?

回答

1

首先,你應該檢查你的進口。你從Scala類中得到一個異常,但是你的程序是用Java實現的。您可能意外地導入了Scala DataSet API。使用Java API不應該導致Scala異常(除非您使用依賴於Scala的類)。無論是

,弗林克有一個內置的聚合方法minmax

DataSet<LogPojo> logSet = env.fromCollection(logs); 
// map LogPojo to a Tuple1<Double> 
// (Flink's built-in aggregation functions work only on Tuple types) 
DataSet<Tuple1<Double>> values = logSet.map(new MapFunction<LogPojo, Tuple1<Double>>() { 
    @Override 
    public Tuple1<Double> map(LogPojo l) throws Exception { 
     return new Tuple1<>(l.value); 
    } 
    }); 
// fetch the min value (at position 0 in the Tuple) 
List<Tuple1<Double>> c = values.min(0).collect(); 
// get the first field of the Tuple 
Double minVal = c.get(0).f0; 
+0

謝謝你,對進口的提示解決它。看起來火花斯卡拉依賴項之一是肆虐。一旦我禁用了火花模塊,它就可以工作。 –