1
我正在使用Flink 0.10.1的DataSet API編寫應用程序。 我可以使用Flink中的單個操作員獲得多個收集器嗎?Apache Flink:如何使用Flink DataSet API從一個數據集創建兩個數據集
我想要做的是什麼樣的東西如下:
val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
(iterator, collector1, collector2) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector1.collect(elem1)
collector2.collect(elem2)
}
}
}
目前我打電話mapPartition兩次從一個源數據集使兩個數據集。
val lines = env.readTextFile(...)
val out_small = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem1)
}
}
}
val out_large = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem2)
}
}
}
由於doParsing功能是相當昂貴的,我想每行只有一次調用它。
p.s.如果你能讓我知道其他方法來以更簡單的方式做這種事情,我將非常感激。