2016-08-03 70 views
1

我目前正試圖在Apache Spark和Apache Flink中實現一些算法。在執行這些算法時,我必須做一些設置的差異/減法操作。Apache Flink DataSet差異/減法運算

雖然Apache Spark有一個內置的subtract操作,但我在Apache Flink(1.0.3和1.1.0-SNAPSHOT)中找不到類似的東西。

所以我的問題是,給定兩個DataSet對象d1, d2都包含相同類型T,什麼是應用設置差異的最有效方法,即d1\d2

val d1: DataSet[T] = ... 
val d2: DataSet[T] = ... 
val d_diff: DataSet[T] = ??? 

有可能是通過coGroup

val d_diff = d1.coGroup(d2).where(0).equalTo(0) { 
       (l, r, out: Collector[T]) => { 
       val rightElements = r.toSet 
       for (el <- l) 
        if (!rightElements.contains(el)) out.collect(el) 
       } 
      } 

某種方式,但我不知道這是否是正確的和最佳實踐或不有人知道一些更有效?

回答

2

DataSet API不提供方法,因爲它只包含非常基本的一組操作。 1.1中的Table API將有一個設置減號運算符。你可以看到它是如何實現的here。使用this CoGroupFunction。所以是的,你在正確的軌道上。