我試圖寫一個非常簡單的工作,只有1個映射器,沒有reducer將一些數據寫入hbase。在映射器中,我試圖簡單地打開與hbase的連接,將幾行數據寫入表中,然後關閉連接。在作業驅動程序中,我使用JobConf.setNumMapTasks(1);和JobConf.setNumReduceTasks(0);指定只執行1個映射器而不執行Reducer。我還在jobConf中將Reducer類設置爲IdentityReducer。我觀察到的奇怪行爲是,作業成功地將數據寫入hbase表,然而之後,我在日誌中看到它不斷嘗試打開與hbase的連接,然後關閉連接,持續20-30分鐘,並在作業之後宣佈已完成100%成功。在結束的時候我檢查由我把OutputCollector.collect虛擬數據(......)我看到百虛的數據行創建_success文件時應該只有1 以下是作業驅動意外多次執行映射器打算運行一次
代碼public int run(String[] arg0) throws Exception {
Configuration config = HBaseConfiguration.create(getConf());
ensureRequiredParametersExist(config);
ensureOptionalParametersExist(config);
JobConf jobConf = new JobConf(config, getClass());
jobConf.setJobName(config.get(ETLJobConstants.ETL_JOB_NAME));
//set map specific configuration
jobConf.setNumMapTasks(1);
jobConf.setMaxMapAttempts(1);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setMapperClass(SingletonMapper.class);
jobConf.setMapOutputKeyClass(LongWritable.class);
jobConf.setMapOutputValueClass(Text.class);
//set reducer specific configuration
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setNumReduceTasks(0);
//set job specific configuration details like input file name etc
FileInputFormat.setInputPaths(jobConf, jobConf.get(ETLJobConstants.ETL_JOB_FILE_INPUT_PATH));
System.out.println("setting output path to : " + jobConf.get(ETLJobConstants.ETL_JOB_FILE_OUTPUT_PATH));
FileOutputFormat.setOutputPath(jobConf,
new Path(jobConf.get(ETLJobConstants.ETL_JOB_FILE_OUTPUT_PATH)));
JobClient.runJob(jobConf);
return 0;
}
驅動程序類擴展配置並實現工具(我使用了明確的指導樣本)下面是我的映射類的代碼。
以下是我的映射器的地圖方法的代碼,我只需打開與HBase的連接,做一些初步的檢查,以確保表存在,然後寫行並關閉表。
public void map(LongWritable arg0, Text arg1,
OutputCollector<LongWritable, Text> arg2, Reporter arg3)
throws IOException {
HTable aTable = null;
HBaseAdmin admin = null;
try {
arg3.setStatus("started");
/*
* set-up hbase config
*/
admin = new HBaseAdmin(conf);
/*
* open connection to table
*/
String tableName = conf.get(ETLJobConstants.ETL_JOB_TABLE_NAME);
HTableDescriptor htd = new HTableDescriptor(toBytes(tableName));
String colFamilyName = conf.get(ETLJobConstants.ETL_JOB_TABLE_COLUMN_FAMILY_NAME);
byte[] tablename = htd.getName();
/* call function to ensure table with 'tablename' exists */
/*
* loop and put the file data into the table
*/
aTable = new HTable(conf, tableName);
DataRow row = /* logic to generate data */
while (row != null) {
byte[] rowKey = toBytes(row.getRowKey());
Put put = new Put(rowKey);
for (DataNode node : row.getRowData()) {
put.add(toBytes(colFamilyName), toBytes(node.getNodeName()),
toBytes(node.getNodeValue()));
}
aTable.put(put);
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxo added another data row to hbase");
row = fileParser.getNextRow();
}
aTable.flushCommits();
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxo Finished adding data to hbase");
} finally {
if (aTable != null) {
aTable.close();
}
if (admin != null) {
admin.close();
}
}
arg2.collect(new LongWritable(10), new Text("something"));
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxoadded some dummy data to the collector");
}
就如同您身邊,我到底寫一些假數據採集看到底(10「東西」)有,我看到數百這個數據在_success文件的行作業後終止。 我無法確定爲什麼映射器代碼重複多次重新啓動,而不是隻運行一次。任何幫助將不勝感激。