2014-04-08 41 views
2

後保留所有的領域,我做了groupBy計算的值,但似乎我的組,我失去了所有未在聚集鍵的字段:燙:GROUPBY

filtered.filterNot('site) {s:String => ...} 
     .filterNot('date) {s:String => ...} 
aggr = filtered.groupBy('id, 'contentHost) { group => 
    group.min('timestamp -> 'min) 
    //how do I keep original fields? (eg: site, date) 
} 

aggr.store(Tsv(...)) //eg: field "site" won't be here 

在豬,這將是這樣的:

aggr = group filtered by concat('id, 'contentHost); 

result = foreach aggr { 
    generate flatten(filtered), //how to do this in scalding? 
      min(filtered.timestamp) as min; 
} 

回答

4

我有同樣的問題與元組API,只能通過使用類型化的API解決這個問題。

您既可以使用Scala元組,也可以在作業外定義自己的案例類。例如: -

case class Data(id: String, site: String, date: String, contentHost: String) 

然後你會處理這樣的:

val filtered: TypedPipe[Data] = TypedPipe.from(Seq(Data("...", "2014-04-14", "...", "..."))) 

filtered 
    .filterNot (data => data.site == "fr") 
    .filterNot (data => data.date == "2014-02-01") 
    .groupBy (data => (data.id, data.contentHost)) // (String,String) -> Data 
    .min // or .minBy { ... } 
    .toTypedPipe 
    .write(TypedTsv[((String, String), Data)]("/path/")) 
+0

謝謝!我很難相信這對於tulpe API來說是不可能的。要多研究一下。 –

+0

保持我們的發佈:)這裏也有同樣的問題:http://stackoverflow.com/questions/17507526/scalding-how-to-retain-the-other-field-after-a-groupbyfield-size?rq=1 –

+0

嗨,我們可以在沒有這個spagetti的情況下使用元組API嗎? – Sergey