2016-09-29 55 views
3

我注意到java apache束有類groupby.sortbytimestamp python是否實現了該功能?如果不是,將在窗口中排序元素的方式是什麼?我想我可以在DoFn中整理整個窗口,但我想知道是否有更好的方法。如何在python apache梁中的窗口中排序元素?

+0

你在哪裏找到這門課嗎?我認爲它不再存在:https://github.com/apache/beam/search?utf8=%E2%9C%93&q=sortbytimestamp&type= – skeller88

回答

6

Beam中目前沒有內置的值排序(無論是Python還是Java)。現在,最好的選擇是像你剛纔提到的那樣,在DoFn中自己排序值。

1

這是一個使用CombineFn的解決方案。它具有使用TreeSet重複數據刪除的額外好處。您還應該確保您的窗口數據足夠小,以適應單個工作人員的內存。

public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> { 
@Override 
public TreeSet<MarketData> createAccumulator() { 
    return new TreeSet<>(Comparator 
      .comparingLong(MarketData::getEventTime) 
      .thenComparing(MarketData::getOrderbookType)); 
} 

@Override 
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) { 
    accum.add(input); 
    return accum; 
} 

@Override 
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) { 

    TreeSet<MarketData> merged = createAccumulator(); 
    for (TreeSet<MarketData> accum : accums) { 
     merged.addAll(accum); 
    } 
    return merged; 
} 

@Override 
public List<MarketData> extractOutput(TreeSet<MarketData> accum) { 
    return Lists.newArrayList(accum.iterator()); 
} 

}