2012-04-13 60 views
3

在我的MapReduce程序的簡化階段,我執行的唯一操作是在所提供的迭代器concatonate每個值,如下:處理較大的輸出值減少的步驟在Hadoop中

public void reduce(Text key, Iterator<text> values, 
        OutputCollector<Text, Text> output, Reporter reporter) { 
    Text next; 
    Text outKey = new Text() 
    Text outVal = new Text(); 
    StringBuilder sb = new StringBuilder(); 
    while(values.hasNext()) { 
     next = values.next(); 
     sb.append(next.toString()); 
     if (values.hasNext()) 
      sb.append(','); 
    } 
    outKey.set(key.toString()); 
    outVal.set(sb.toSTring()); 
    output.collect(outKey,outVal); 
} 

我的問題是一些減少輸出值是巨大的行文字;非常大,即使是非常大的初始大小,字符串緩衝區也必須多次增加(加倍)其大小以容納迭代器的所有上下文,從而導致內存問題。

在傳統的Java應用程序中,這表示寫入文件的緩衝寫入將是寫入輸出的首選方法。你如何處理Hadoop中非常大的輸出鍵值對?我應該直接將結果流式傳輸到HDFS上的文件(每次縮減調用一個文件)?有沒有辦法緩衝輸出,而不是output.collect方法?

注意:我已經盡最大可能增加了我的記憶/堆積。另外,有幾個消息來源表示,增加reducer的數量可以幫助解決內存/堆的問題,但是這裏的問題直接關係到SringBuilder在擴展容量時的使用。

由於

回答

3

不是我明白你爲什麼想要一個巨大的價值,但有一種方法可以做到這一點。

如果您編寫自己的OutputFormat,則可以根據Key值是否爲null來修復RecordWriter.write(Key, Value)方法處理值串聯的行爲。

這樣,在你減速,你可以按如下編寫代碼(在關鍵的第一輸出是實際的關鍵,在那之後一切都是空的關鍵:

public void reduce(Text key, Iterator<Text> values, 
       OutputCollector<Text, Text> output, Reporter reporter) { 
    boolean firstKey = true; 
    for (Text value : values) { 
    output.collect(firstKey ? key : null, value); 
    firstKey = false; 
    } 
} 

實際RecordWriter.write()遂將下面的邏輯來處理null鍵/值級聯邏輯:

public synchronized void write(K key, V value) throws IOException { 

     boolean nullKey = key == null || key instanceof NullWritable; 
     boolean nullValue = value == null || value instanceof NullWritable; 
     if (nullKey && nullValue) { 
      return; 
     } 

     if (!nullKey) { 
      // if we've written data before, append a new line 
      if (dataWritten) { 
       out.write(newline); 
      } 

      // write out the key and separator 
      writeObject(key); 
      out.write(keyValueSeparator); 
     } else if (!nullValue) { 
      // write out the value delimiter 
      out.write(valueDelimiter); 
     } 

     // write out the value 
     writeObject(value); 

     // track that we've written some data 
     dataWritten = true; 
    } 

    public synchronized void close(Reporter reporter) throws IOException { 
     // if we've written out any data, append a closing newline 
     if (dataWritten) { 
      out.write(newline); 
     } 

     out.close(); 
    } 

你會注意到close方法也被修改爲一個結尾的新行寫入到寫出來

012的最後一個記錄

全部代碼可以在pastebin發現,這裏的測試輸出:

key1 value1 
key2 value1,value2,value3 
key3 value1,value2 
2

如果單輸出鍵值可以更大然後存儲它意味着標準輸出機構不適於 - 因爲,通過inerface設計它需要鍵 - 值對的通過,而不是流。
我認爲最簡單的解決方案是將輸出流傳輸到HDFS文件。
如果您有理由通過輸出格式傳遞數據 - 我建議採用以下解決方案: a)寫入本地臨時目錄
b)將文件的名稱作爲輸出格式的值傳遞。

可能最有效但有點複雜的解決方案是將內存映射文件用作緩衝區。只要有足夠的內存,它將在內存中,並且在需要時,操作系統將關心高效的磁盤溢出。