0
假設我有記錄是這樣的:阿帕奇弗林克 - 點心和不斷分組
("a-b", "data1", 1)
("a-c", "data2", 1)
("a-b", "data3", 1)
我怎樣才能在Apache中弗林克組和總和,這樣我有以下的結果嗎?
("a-b", ["data1", "data3"], 2)
("a-c", ["data2"], 1)
問候, 凱文
假設我有記錄是這樣的:阿帕奇弗林克 - 點心和不斷分組
("a-b", "data1", 1)
("a-c", "data2", 1)
("a-b", "data3", 1)
我怎樣才能在Apache中弗林克組和總和,這樣我有以下的結果嗎?
("a-b", ["data1", "data3"], 2)
("a-c", ["data2"], 1)
問候, 凱文
我在弗林克殼($FLINK_HOME/bin/start-scala-shell.sh local
)用下面的代碼來實現這一點:
import org.apache.flink.util.Collector
benv.
fromElements(("a-b", "data1", 1), ("a-c", "data2", 1), ("a-b", "data3", 1)).
groupBy(0).
reduceGroup {
(it: Iterator[(String, String, Int)], out: Collector[(String, List[String], Int)]) => {
// Watch out: if the group is _very_ large this can lead to OOM errors
val group = it.toList
// For all groups with at least one element (prevent out-of-bounds)
if (group.length > 0)
// Get the "name", all the elements and their third-column aggregate
out.collect((group(0)._1, group.map(_._2), group.map(_._3).sum))
}
}.print
利用隨後的輸出
(a-b,List(data1, data3),2)
(a-c,List(data2),1)
謝謝您!我希望除了類似MapReduce的解決方案之外,還有其他一些可用的東西,但這仍然是(仍然),而不是這種情況。再次感謝你 :-)! –