2013-05-29 51 views
2

這是背景。我有以下輸入我的MapReduce工作(例如):(其實每一行代表一個用戶查詢這裏並不重要。)如何在Hadoop輸入格式中爲一個輸入行生成多個鍵值對?

Apache Hadoop 
Apache Lucene 
StackOverflow 
.... 

而且我想我RecordReader類讀取一行,然後通過幾個鍵值配對映射器。例如,如果RecordReader得到Apache Hadoop,那麼我想它生成以下鍵值對,並把它傳遞給映射器:

Apache Hadoop - 1 
Apache Hadoop - 2 
Apache Hadoop - 3 

(「 - 」作爲分隔符這裏)而且我發現RecordReader傳球鍵 - 在next()方法值:

next(key, value); 

每次RecordReader.next()被調用時,只有一個密鑰和一個值將被作爲參數傳遞。那麼我應該如何完成我的工作?

回答

2

我相信你可以簡單地使用:

public static class MultiMapper 
     extends Mapper<LongWritable, Text, Text, IntWritable> { 

    @Override 
    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 

     for (int i = 1; i <= n; i++) { 
      context.write(value, new IntWritable(i)); 
     } 
    } 
} 

這裏,n是要傳遞值的數量。例如,對於鍵值對你指定:

Apache Hadoop - 1 
Apache Hadoop - 2 
Apache Hadoop - 3 

N將是3

+0

你的意思是我在第一次做這個工作地圖階段?但是這些值將會在下一個Reducer中一起收集。我想要的是將鍵值對發送給不同的映射器。例如,Mapper 1的「Apache Hadoop-1」,Mapper 2的「Apache Hadoop-2」就像這樣。 – Denzel

+0

你爲什麼要這麼做?加快速度?你能解釋一下你的想法嗎? – aa8y

+0

我在這裏犯了一個錯誤。你就在這裏。 – Denzel

0

儘量不給鑰匙: -

context.write(NullWritable.get(), new Text("Apache Hadoop - 1")); 
context.write(NullWritable.get(), new Text("Apache Hadoop - 2")); 
context.write(NullWritable.get(), new Text("Apache Hadoop - 3")); 
+0

我希望在RecordReader中做到這一點,但不是Mapper。 – Denzel

1

我覺得如果你要發送到映射器使用相同的密鑰;你必須實現你的擁有者RecordReader;例如你可以使用MutliRecordReader來擴展LineRecordReade;在這裏你必須改變nextKeyValue方法; 這是從LineRecordReade原代碼:

public boolean nextKeyValue() throws IOException { 
    if (key == null) { 
     key = new LongWritable(); 
    } 
    key.set(pos); 
    if (value == null) { 
     value = new Text(); 
    } 
    int newSize = 0; 
    // We always read one extra line, which lies outside the upper 
    // split limit i.e. (end - 1) 
    while (getFilePosition() <= end) { 
     newSize = in.readLine(value, maxLineLength, 
      Math.max(maxBytesToConsume(pos), maxLineLength)); 
     pos += newSize; 
     if (newSize < maxLineLength) { 
     break; 
     } 

     // line too long. try again 
     LOG.info("Skipped line of size " + newSize + " at pos " + 
       (pos - newSize)); 
    } 
    if (newSize == 0) { 
     key = null; 
     value = null; 
     return false; 
    } else { 
     return true; 
    } 
    } 

,你可以這樣改:

public boolean nextKeyValue() throws IOException { 
    if (key == null) { 
     key = new Text(); 
    } 
    key.set(pos); 
    if (value == null) { 
     value = new Text(); 
    } 
    int newSize = 0; 

    while (getFilePosition() <= end && n<=3) { 
     newSize = in.readLine(key, maxLineLength, 
      Math.max(maxBytesToConsume(pos), maxLineLength));//change value --> key 

    value =Text(n); 
    n++; 
    if(n ==3)// we don't go to next until the N is three; 
     pos += newSize; 

     if (newSize < maxLineLength) { 
     break; 
     } 

     // line too long. try again 
     LOG.info("Skipped line of size " + newSize + " at pos " + 
       (pos - newSize)); 
    } 
    if (newSize == 0) { 
     key = null; 
     value = null; 
     return false; 
    } else { 
     return true; 
    } 
    } 

我認爲這是可以適合你

+0

我懷疑這一點。您爲「值」分配了三個值,但三個值都可以發送給映射器嗎?或者只是將第三個值發送給映射器? – Denzel

+0

拆分只能由一個映射器處理;一個分割器一個映射器 – Winston

+0

一個分割可能等於hdfs中的一個塊,僅由一個映射器處理;一個分裂可以產生許多關鍵值;這些鍵值都由一個映射器處理,因此您不能將相同的拆分鍵值發送給不同的映射器; – Winston

相關問題