2015-02-06 47 views
1

我試圖將管道輸出到不同的目錄中,以便每個目錄的輸出將基於某些ID進行分區。 所以在一個普通的map減少代碼我會使用MultipleOutputs類,我會在減速器中做這樣的事情。如何在Scalding中輸出輸出

protected void reduce(final SomeKey key, 
     final Iterable<SomeValue> values, 
     final Context context) { 

    ... 
    for (SomeValue value: values) { 
    String bucketId = computeBucketIdFrom(...); 
    multipleOutputs.write(key, value, folderName + "/" + bucketId); 
    ... 

,所以我想一個能做到這一點像這樣在滾燙的

... 
    val somePipe = Csv(in, separator = "\t", 
     fields = someSchema, 
     skipHeader = true) 
    .read 

    for (i <- 1 until numberOfBuckets) { 
    somePipe 
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i} 
    .write(Csv(out + "/bucket" + i , 
     writeHeader = true, 
     separator = "\t")) 
    } 

但我覺得你最終會雷丁相同的管道很多次,它會影響到整體性能。

有沒有其他的選擇?

感謝

回答

1

是的,當然有使用TemplatedTsv一個更好的辦法。

所以你上面的代碼可以寫成如下,

val somePipe = Tsv(in, fields = someSchema, skipHeader = true) 
    .read 
    .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true)) 

這將會把從「未來的所有記錄SOME_ID到單獨的文件夾下了/ some_ids文件夾。

但是,您也可以創建整數存儲桶。只需更改最後一行,

.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }  
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket))) 

這將創建兩個數字文件夾out/dd /。您還可以檢查模板化的TSV api here.

使用templatedTsv可能會出現一個小問題,那就是reducer可能會生成大量小文件,這些小文件可能會對您的結果造成下一項工作不利。因此,最好在寫入磁盤之前對模板字段進行排序。我寫了一個關於它的博客here.

+0

你能幫忙回答一個相關的燙傷問題嗎?http://stackoverflow.com/questions/28687539/how-to-output-data-with-hive-style-directory - 結構在燙傷 – 2015-02-24 19:24:22

+0

是的,我已經回答了這個問題http://stackoverflow.com/a/28714754/2908547。希望它是有幫助的,最好的 – morazow 2015-02-25 08:55:12