2017-09-26 28 views
1

我需要知道我正在使用的輸入文件的分區的行索引。我可以通過將行索引連接到數據在原始文件中強制執行此操作,但我寧願在Hadoop中執行此操作。我在映射器中有這個...在Hadoop中獲取輸入文件的分區ID

String id = context.getConfiguration().get("mapreduce.task.partition"); 

但是在任何情況下「id」都是0。在「Hadoop:權威指南」中,它提到訪問屬性,如分區ID「可以通過傳遞給Mapper或Reducer的所有方法的上下文對象來訪問」。從我所知道的來看,它並沒有真正涉及如何獲取這些信息。

我瀏覽了Context對象的文檔,看起來上面是這樣做的方法,腳本也會編譯。但是因爲每個價值都是0,所以我不確定我是否真的使用了正確的東西,但我無法在網上找到任何可以幫助我們搞清楚的細節。

代碼用來測試...

public class Test { 

public static class TestMapper extends Mapper<LongWritable, Text, Text, Text> { 

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     String id = context.getConfiguration().get("mapreduce.task.partition"); 
     context.write(new Text("Test"), new Text(id + "_" + value.toString())); 
    } 
} 


public static class TestReducer extends Reducer<Text, Text, Text, Text> { 

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 

     for(Text value : values) { 
      context.write(key, value); 
     } 
    } 
} 


public static void main(String[] args) throws Exception { 

    if(args.length != 2) { 
     System.err.println("Usage: Test <input path> <output path>"); 
     System.exit(-1); 
    } 

    Job job = new Job(); 
    job.setJarByClass(Test.class); 
    job.setJobName("Test"); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.setMapperClass(TestMapper.class); 
    job.setReducerClass(TestReducer.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 
+0

我不清楚關於輸入文件分區的行索引究竟是什麼意思。你能澄清嗎? –

+0

@BinaryNerd我可能是錯的,但我認爲這將是輸入文件的行ID。所以說如果文件中有100行,我希望知道映射器正在工作的當前行是什麼(所以從0-99或1-100的數字) – cpd1

回答

1

兩個選項:

  1. 使用偏移而不是行號
  2. 軌跡映射器中的行號

對於第一個,關鍵是LongWritable告訴你正在進行的線的偏移量ssed。除非您的線條長度完全相同,否則您將無法從偏移量計算線條編號,但它確實可以讓您確定排序是否有用。

第二個選項是在映射器中跟蹤它。你可以更改您的代碼是這樣的:

public static class TestMapper extends Mapper<LongWritable, Text, Text, Text> { 

    private long currentLineNum = 0; 
    private Text test = new Text("Test"); 

    public void map(LongWritable key, Text value, Context context) 
          throws IOException, InterruptedException { 

     context.write(test, new Text(currentLineNum + "_" + value)); 
     currentLineNum++; 
    } 
} 
+0

嗯。我可以使用偏移量。我看到它是6的倍數,而且行數應該是相同的長度。對於上面提供的內容,它將如何知道當前行是什麼?我的假設是,製圖人員正在同時工作,所以上面的計數器可能沒有合適的行數。例如,如果要完成的第一個映射器是第5行的映射器,那麼當前的行號不會是1嗎? – cpd1

+0

映射器的每個實例都將按順序處理文件中的行/分割其工作。如果你有多個mapper運行,他們將分別處理他們自己的分割。沒有對發生的文件的併發訪問,因此您可以使用上述簡單的方法跟蹤該行。你需要確保你的輸入不會分裂,所以使用gz壓縮之類的東西。 –

+0

明白了。非常感謝你。我認爲它會是併發的,所以變量會關閉,但只是在一個大的數據集上進行測試,並且完全按照您提到的方式工作。感謝所有的幫助。 – cpd1

0

你也可以代表你的矩陣元組的線路,包括對每一個元組,所以當你的文件正在閱讀的行和山坳,你有信息。如果您使用的空間或逗號分隔的文件組成一個二維數組,那麼很難弄清楚您在映射器中正在處理的行(行)