2013-07-06 19 views
6

所以我的輸入數據有兩個字段/列:ID1 & ID2,和我的代碼如下:滾燙:如何在groupBy('field){。size}之後保留其他字段?

TextLine(args("input")) 
.read 
.mapTo('line->('id1,'id2)) {line: String => 
    val fields = line.split("\t") 
     (fields(0),fields(1)) 
} 
.groupBy('id2){.size} 
.write(Tsv(args("output"))) 

中的輸出結果(我認爲)兩個字段:ID2 *的大小。我有點卡住發現是否有可能保留id2的值,這也是id2分組,並將其添加爲另一個字段?

回答

8

你不能以恐怖的方式做到這一點。想想它是如何工作的 - 它將數據分割成塊並將其發送到不同的進程,每個進程都計算它的塊數,然後單個reducer將它們全部添加到最後。雖然每個進程都在計算它不知道整個大小,所以它不能添加該字段。唯一的方法是在知道完整大小(即連接)後返回並將其添加到數據中。

如果每個組中裝入內存(你可以配置內存),您可以:

Tsv(args("input"), ('id1, 'id2)) 
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list)) 
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) { 
    case (list, size) => list.map(record => (record._1, record._2, size)) 
} 
.write(Tsv(args("output"))) 

但是如果你的系統沒有足夠的內存,你將不得不使用昂貴的加入。

備註: 您可以使用Tsv代替TextLine,然後使用mapTo和splitting。

+0

請看看它是否有意義,我感到同樣的痛苦。 http://stackoverflow.com/questions/25994879/scalding-flatten-fields-after-groupby – Sergey

相關問題