2015-06-18 54 views
0

我剛開始學習Hadoop。我試圖將流式接口與處理文件的Python腳本一起使用:對於每個輸入文件,我都會創建一個輸出文件並提供關於它的一些信息,所以這是一個沒有縮減器的映射作業。我發現的是,文件正在一次處理一個,這不是我想要的。Hadoop Streaming作業中並行映射器任務的數量

我會解釋我所做的事情,但之後我還會發布一些代碼以防萬一我在那裏丟失了某些東西。

我有一個輸入格式和記錄閱讀器,它讀取整個文件並將其內容用作值和文件名作爲鍵。 (這些文件並不是很大。)另一方面,我有一個輸出格式和記錄寫入器,它將值寫入基於密鑰名稱的文件中。我正在使用-io rawbytes,我的Python腳本知道如何讀取和寫入鍵/值對。

這一切工作正常,在生產我期待的產出方面。如果我用例如10個輸入文件運行,我會得到10個分割。這意味着每次我的腳本運行時只會得到一個鍵/值對 - 這並不理想,但這不是什麼大問題,我可以看出這可能是不可避免的。更不好的是它在任何時候只有一個腳本正在運行的實例。設置mapreduce.job.maps並沒有什麼區別(儘管我依稀記得看到這個值只是一個建議,所以也許Hadoop做出了不同的決定)。我錯過了什麼?

這裏是我的代碼: -

#!/bin/bash 

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ 
    -libjars mimi.jar \ 
    -D mapreduce.job.reduces=0 \ 
    -files rawbytes_mapper.py,irrelevant.py \ 
    -inputformat "mimi.WholeFileInputFormat" \ 
    -outputformat "mimi.NamedFileOutputFormat" \ 
    -io rawbytes \ 
    -mapper "rawbytes_mapper.py irrelevant blah blah blah" \ 
    -input "input/*.xml" \ 
    -output output 
#!/usr/bin/python 

def read_raw_bytes(input): 
    length_bytes = input.read(4) 
    if len(length_bytes) < 4: 
     return None 
    length = 0 
    for b in length_bytes: 
     length = (length << 8) + ord(b) 
    return input.read(length) 

def write_raw_bytes(output, s): 
    length = len(s) 
    length_bytes = [] 
    for _ in range(4): 
     length_bytes.append(chr(length & 0xff)) 
     length = length >> 8 
    length_bytes.reverse() 
    for b in length_bytes: 
     output.write(b) 
    output.write(s) 

def read_keys_and_values(input): 
    d = {} 
    while True: 
     key = read_raw_bytes(input) 
     if key is None: break 
     value = read_raw_bytes(input) 
     d[key] = value 
    return d 

def write_keys_and_values(output, d): 
    for key in d: 
     write_raw_bytes(output, key) 
     write_raw_bytes(output, d[key]) 

if __name__ == "__main__": 
    import sys 
    module = __import__(sys.argv[1]) 
    before = read_keys_and_values(sys.stdin) 
    module.init(sys.argv[2:]) 
    after = module.process(before) 
    write_keys_and_values(sys.stdout, after) 
package mimi; 

import java.io.IOException; 
import java.nio.charset.StandardCharsets; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 

public class WholeFileInputFormat extends FileInputFormat<BytesWritable, BytesWritable> 
{ 
    private static class WholeFileRecordReader implements RecordReader<BytesWritable, BytesWritable> 
    { 
     private FileSplit split; 
     private JobConf conf; 
     private boolean processed = false; 

     public WholeFileRecordReader(FileSplit split, JobConf conf) 
     { 
      this.split = split; 
      this.conf = conf; 
     } 

     @Override 
     public BytesWritable createKey() 
     { 
      return new BytesWritable(); 
     } 

     @Override 
     public BytesWritable createValue() 
     { 
      return new BytesWritable(); 
     } 

     @Override 
     public boolean next(BytesWritable key, BytesWritable value) throws IOException 
     { 
      if (processed) 
      { 
       return false; 
      } 

      byte[] contents = new byte[(int) split.getLength()]; 
      Path file = split.getPath(); 
      String name = file.getName(); 
      byte[] bytes = name.getBytes(StandardCharsets.UTF_8); 
      key.set(bytes, 0, bytes.length); 
      FileSystem fs = file.getFileSystem(conf); 
      FSDataInputStream in = null; 
      try 
      { 
       in = fs.open(file); 
       IOUtils.readFully(in, contents, 0, contents.length); 
       value.set(contents, 0, contents.length); 
      } 
      finally 
      { 
       IOUtils.closeStream(in); 
      } 

      processed = true; 
      return true; 
     } 

     @Override 
     public float getProgress() throws IOException 
     { 
      return processed ? 1.0f : 0.0f; 
     } 

     @Override 
     public long getPos() throws IOException 
     { 
      return processed ? 0l : split.getLength(); 
     } 

     @Override 
     public void close() throws IOException 
     { 
      // do nothing 
     } 
    } 

    @Override 
    protected boolean isSplitable(FileSystem fs, Path file) 
    { 
     return false; 
    } 

    @Override 
    public RecordReader<BytesWritable, BytesWritable> getRecordReader(InputSplit split, 
                     JobConf conf, 
                     Reporter reporter) 
    throws IOException 
    { 
     return new WholeFileRecordReader((FileSplit) split, conf); 
    } 
} 
package mimi; 

import java.io.IOException; 
import java.nio.charset.StandardCharsets; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.mapred.lib.MultipleOutputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.RecordWriter; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.util.Progressable; 

public class NamedFileOutputFormat extends MultipleOutputFormat<BytesWritable, BytesWritable> 
{ 
    private static class BytesValueWriter implements RecordWriter<BytesWritable, BytesWritable> 
    { 
     FSDataOutputStream out; 

     BytesValueWriter(FSDataOutputStream out) 
     { 
      this.out = out; 
     } 

     @Override 
     public synchronized void write(BytesWritable key, BytesWritable value) throws IOException 
     { 
      out.write(value.getBytes(), 0, value.getLength()); 
     } 

     @Override 
     public void close(Reporter reporter) throws IOException 
     { 
      out.close(); 
     } 
    } 

    @Override 
    protected String generateFileNameForKeyValue(BytesWritable key, BytesWritable value, String name) 
    { 
     return new String(key.getBytes(), 0, key.getLength(), StandardCharsets.UTF_8); 
    } 

    @Override 
    public RecordWriter<BytesWritable, BytesWritable> getBaseRecordWriter(FileSystem ignored, 
                      JobConf conf, 
                      String name, 
                      Progressable progress) 
    throws IOException 
    { 
     Path file = FileOutputFormat.getTaskOutputPath(conf, name); 
     FileSystem fs = file.getFileSystem(conf); 
     FSDataOutputStream out = fs.create(file, progress); 
     return new BytesValueWriter(out); 
    } 
} 

回答

0

我想我可以幫你解決問題的這部分:

每次我的腳本運行它只有一個鍵/值對 - 這不是理想的

如果 isSplitable方法返回false,每個映射器只處理一個文件。因此,如果您不覆蓋 isSplitable方法並將其保留返回true您應該在一個映射器中具有多個鍵/值對。在你的情況下,每個文件都是一個鍵/值對,因此即使 isSplitable返回true時也不能拆分它們。

我不明白爲什麼只有一個映射器在一次啓動,但我仍在考慮它:)

相關問題