如何在Reducer中使用MultipleOutputs類來編寫多個輸出,每個輸出都可以有其自己的唯一配置? MultipleOutputs javadoc中有一些文檔,但似乎僅限於文本輸出。事實證明,MultipleOutputs可以處理每個輸出的輸出路徑,關鍵類和值類,但嘗試使用需要使用其他配置屬性的輸出格式失敗。 (這個問題已經出現好幾次了,但是我的答案已經被挫敗了,因爲提問者實際上有一個不同的問題,因爲這個問題需要我花了幾天的時間才能回答,所以我是如this Meta Stack Overflow question所示,回答我自己的問題。)如何在Hadoop減速器中寫入多種不同格式的輸出?
2
A
回答
2
我爬過MultipleOutputs實現並發現它不支持任何具有outputDir,鍵類和值類以外屬性的OutputFormatType。我試圖編寫自己的MultipleOutputs類,但是失敗了,因爲它需要在Hadoop類的某個地方調用私有方法。
我剩下的只有一種解決方法,它似乎適用於所有情況以及輸出格式和配置的所有組合:編寫我想使用的OutputFormat類的子類(這些結果是可重用的)。這些類瞭解其他OutputFormat同時使用並知道如何存儲其屬性。該設計利用了這樣一個事實,即OutputFormat可以在被請求使用RecordWriter之前配置上下文。
我有了這個與Cassandra的ColumnFamilyOutputFormat工作:
package com.myorg.hadoop.platform;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
public abstract class ConcurrentColumnFamilyOutputFormat
extends ColumnFamilyOutputFormat
implements Configurable {
private static String[] propertyName = {
"cassandra.output.keyspace" ,
"cassandra.output.keyspace.username" ,
"cassandra.output.keyspace.passwd" ,
"cassandra.output.columnfamily" ,
"cassandra.output.predicate",
"cassandra.output.thrift.port" ,
"cassandra.output.thrift.address" ,
"cassandra.output.partitioner.class"
};
private Configuration configuration;
public ConcurrentColumnFamilyOutputFormat() {
super();
}
public Configuration getConf() {
return configuration;
}
public void setConf(Configuration conf) {
configuration = conf;
String prefix = "multiple.outputs." + getMultiOutputName() + ".";
for (int i = 0; i < propertyName.length; i++) {
String property = prefix + propertyName[i];
String value = conf.get(property);
if (value != null) {
conf.set(propertyName[i], value);
}
}
}
public void configure(Configuration conf) {
String prefix = "multiple.outputs." + getMultiOutputName() + ".";
for (int i = 0; i < propertyName.length; i++) {
String property = prefix + propertyName[i];
String value = conf.get(propertyName[i]);
if (value != null) {
conf.set(property, value);
}
}
}
public abstract String getMultiOutputName();
}
對於每個卡桑德拉(在這種情況下)你想爲你減速機的輸出,你就會有一個類:
package com.myorg.multioutput.ReadCrawled;
import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat;
public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat {
public StrongOutputFormat() {
super();
}
@Override
public String getMultiOutputName() {
return "Strong";
}
}
,你會在你的映射器/減速配置類進行配置:
// This is how you'd normally configure the ColumnFamilyOutputFormat
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
// This is how you tell the MultipleOutput-aware OutputFormat that
// it's time to save off the configuration so no other OutputFormat
// steps all over it.
new StrongOutputFormat().configure(job.getConfiguration());
// This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat
// to out set of outputs
MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class);
只給另一個例子,輸入多子類FileOutputFormat使用這些屬性:
private static String[] propertyName = {
"mapred.output.compression.type" ,
"mapred.output.compression.codec" ,
"mapred.output.compress" ,
"mapred.output.dir"
};
,並會實現,就像上面不同的是它會使用上述性能。
0
我已經實現了卡珊德拉(見this JIRA ticket,它是目前預定於1.2版本。如果你現在需要它,你可以應用補丁的票。同樣在這給實例的話題退房this presentation MultipleOutputs支持
相關問題
- 1. Hadoop從多種輸入格式中減少
- 2. 如何格式化Mapreduce在Hadoop中寫入的輸出
- 3. 如何使Hadoop減速器爲單個鍵輸出多個值
- 4. 如何在hadoop減速器中寫入int值
- 5. Hadoop減速器原始輸出
- 6. Hadoop的,如何壓縮映射器輸出,但不減速機輸出
- 7. 減速器中的多個輸出
- 8. 的Hadoop:減速機輸出到另一個減速
- 9. 同樣的鑰匙在不同的減速器進來hadoop
- 10. 獲取hadoop減速器中的總輸入路徑
- 11. Hadoop數量的減速器
- 12. Hadoop的循環減速器
- 13. Java:讀取hadoop減速器的輸出文件
- 14. 同樣重要的不同減速器(HADOOP)?
- 15. Hadoop的映射,與減速機的輸出不匹配
- 16. Hadoop - 將減速器編號設置爲0但寫入相同的文件?
- 17. 爲什麼減速機有不同的輸入/輸出鍵,hadoop map/reduce中的值?
- 18. 多個輸出爲一個鍵爲減速功能,Hadoop
- 19. Hadoop和不同格式的輸入如圖像,音頻,視頻
- 20. 哪種格式的壓縮適合hadoop中的大圖輸出?
- 21. 忽略hadoop減速器
- 22. Hadoop mysql限制減速器
- 23. 「減速」輸出
- 24. 我的映射器輸入和減速器輸出是相同的
- 25. Hadoop只生成一個減速器輸出
- 26. Hadoop的減速定製可寫
- 27. Hadoop MapReduce Java實現中的減速器
- 28. Hadoop輸入格式 - 用法
- 29. Hadoop MapReduce不寫輸出
- 30. 的hadoop +可寫接口+閱讀字段拋出在減速器異常
酷:你能否詳細說明在配置和縮短時間內如何獲得多個Cassandra輸出(RecordWriters等)以便不踩在對方腳趾上? –
實際上它非常簡單。您需要做的是將列族的配置鍵更改爲MultipleOutputs使用的配置鍵以確定它寫入的位置,然後允許用戶通過指定多個輸出或調用setColumnFamily來運行該作業。在Hadoop和Cassandra中都可以正常運行。 –