2012-11-26 48 views
2

如何在Reducer中使用MultipleOutputs類來編寫多個輸出,每個輸出都可以有其自己的唯一配置? MultipleOutputs javadoc中有一些文檔,但似乎僅限於文本輸出。事實證明,MultipleOutputs可以處理每個輸出的輸出路徑,關鍵類和值類,但嘗試使用需要使用其他配置屬性的輸出格式失敗。 (這個問題已經出現好幾次了,但是我的答案已經被挫敗了,因爲提問者實際上有一個不同的問題,因爲這個問題需要我花了幾天的時間才能回答,所以我是如this Meta Stack Overflow question所示,回答我自己的問題。)如何在Hadoop減速器中寫入多種不同格式的輸出?

回答

2

我爬過MultipleOutputs實現並發現它不支持任何具有outputDir,鍵類和值類以外屬性的OutputFormatType。我試圖編寫自己的MultipleOutputs類,但是失敗了,因爲它需要在Hadoop類的某個地方調用私有方法。

我剩下的只有一種解決方法,它似乎適用於所有情況以及輸出格式和配置的所有組合:編寫我想使用的OutputFormat類的子類(這些結果是可重用的)。這些類瞭解其他OutputFormat同時使用並知道如何存儲其屬性。該設計利用了這樣一個事實,即OutputFormat可以在被請求使用RecordWriter之前配置上下文。

我有了這個與Cassandra的ColumnFamilyOutputFormat工作:

package com.myorg.hadoop.platform; 

import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; 
import org.apache.hadoop.conf.Configurable; 
import org.apache.hadoop.conf.Configuration; 

public abstract class ConcurrentColumnFamilyOutputFormat 
         extends ColumnFamilyOutputFormat 
         implements Configurable { 

private static String[] propertyName = { 
     "cassandra.output.keyspace" , 
     "cassandra.output.keyspace.username" , 
     "cassandra.output.keyspace.passwd" , 
     "cassandra.output.columnfamily" , 
     "cassandra.output.predicate", 
     "cassandra.output.thrift.port" , 
     "cassandra.output.thrift.address" , 
     "cassandra.output.partitioner.class" 
     }; 

private Configuration configuration; 

public ConcurrentColumnFamilyOutputFormat() { 
    super(); 
} 

public Configuration getConf() { 
    return configuration; 
} 

public void setConf(Configuration conf) { 

    configuration = conf; 

    String prefix = "multiple.outputs." + getMultiOutputName() + "."; 

    for (int i = 0; i < propertyName.length; i++) { 
     String property = prefix + propertyName[i]; 
     String value = conf.get(property); 
     if (value != null) { 
      conf.set(propertyName[i], value); 
     } 
    } 

} 

public void configure(Configuration conf) { 

    String prefix = "multiple.outputs." + getMultiOutputName() + "."; 

    for (int i = 0; i < propertyName.length; i++) { 
     String property = prefix + propertyName[i]; 
     String value = conf.get(propertyName[i]); 
     if (value != null) { 
      conf.set(property, value); 
     } 
    } 

} 

public abstract String getMultiOutputName(); 

}

對於每個卡桑德拉(在這種情況下)你想爲你減速機的輸出,你就會有一個類:

package com.myorg.multioutput.ReadCrawled; 

import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat; 

public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat { 

    public StrongOutputFormat() { 
     super(); 
    } 

    @Override 
    public String getMultiOutputName() { 
     return "Strong"; 
    } 

} 

,你會在你的映射器/減速配置類進行配置:

// This is how you'd normally configure the ColumnFamilyOutputFormat 

ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong"); 
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160"); 
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost"); 
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); 

    // This is how you tell the MultipleOutput-aware OutputFormat that 
    // it's time to save off the configuration so no other OutputFormat 
    // steps all over it. 

new StrongOutputFormat().configure(job.getConfiguration()); 

    // This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat 
    // to out set of outputs 

MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class); 

只給另一個例子,輸入多子類FileOutputFormat使用這些屬性:

private static String[] propertyName = { 
     "mapred.output.compression.type" , 
     "mapred.output.compression.codec" , 
     "mapred.output.compress" , 
     "mapred.output.dir" 
     }; 

,並會實現,就像上面不同的是它會使用上述性能。

0

我已經實現了卡珊德拉(見this JIRA ticket,它是目前預定於1.2版本。如果你現在需要它,你可以應用補丁的票。同樣在這給實例的話題退房this presentation MultipleOutputs支持

+0

酷:你能否詳細說明在配置和縮短時間內如何獲得多個Cassandra輸出(RecordWriters等)以便不踩在對方腳趾上? –

+0

實際上它非常簡單。您需要做的是將列族的配置鍵更改爲MultipleOutputs使用的配置鍵以確定它寫入的位置,然後允許用戶通過指定多個輸出或調用setColumnFamily來運行該作業。在Hadoop和Cassandra中都可以正常運行。 –