我剛開始學習Hadoop。我試圖將流式接口與處理文件的Python腳本一起使用:對於每個輸入文件,我都會創建一個輸出文件並提供關於它的一些信息,所以這是一個沒有縮減器的映射作業。我發現的是,文件正在一次處理一個,這不是我想要的。Hadoop Streaming作業中並行映射器任務的數量
我會解釋我所做的事情,但之後我還會發布一些代碼以防萬一我在那裏丟失了某些東西。
我有一個輸入格式和記錄閱讀器,它讀取整個文件並將其內容用作值和文件名作爲鍵。 (這些文件並不是很大。)另一方面,我有一個輸出格式和記錄寫入器,它將值寫入基於密鑰名稱的文件中。我正在使用-io rawbytes
,我的Python腳本知道如何讀取和寫入鍵/值對。
這一切工作正常,在生產我期待的產出方面。如果我用例如10個輸入文件運行,我會得到10個分割。這意味着每次我的腳本運行時只會得到一個鍵/值對 - 這並不理想,但這不是什麼大問題,我可以看出這可能是不可避免的。更不好的是它在任何時候只有一個腳本正在運行的實例。設置mapreduce.job.maps並沒有什麼區別(儘管我依稀記得看到這個值只是一個建議,所以也許Hadoop做出了不同的決定)。我錯過了什麼?
這裏是我的代碼: -
#!/bin/bash
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-libjars mimi.jar \
-D mapreduce.job.reduces=0 \
-files rawbytes_mapper.py,irrelevant.py \
-inputformat "mimi.WholeFileInputFormat" \
-outputformat "mimi.NamedFileOutputFormat" \
-io rawbytes \
-mapper "rawbytes_mapper.py irrelevant blah blah blah" \
-input "input/*.xml" \
-output output
#!/usr/bin/python
def read_raw_bytes(input):
length_bytes = input.read(4)
if len(length_bytes) < 4:
return None
length = 0
for b in length_bytes:
length = (length << 8) + ord(b)
return input.read(length)
def write_raw_bytes(output, s):
length = len(s)
length_bytes = []
for _ in range(4):
length_bytes.append(chr(length & 0xff))
length = length >> 8
length_bytes.reverse()
for b in length_bytes:
output.write(b)
output.write(s)
def read_keys_and_values(input):
d = {}
while True:
key = read_raw_bytes(input)
if key is None: break
value = read_raw_bytes(input)
d[key] = value
return d
def write_keys_and_values(output, d):
for key in d:
write_raw_bytes(output, key)
write_raw_bytes(output, d[key])
if __name__ == "__main__":
import sys
module = __import__(sys.argv[1])
before = read_keys_and_values(sys.stdin)
module.init(sys.argv[2:])
after = module.process(before)
write_keys_and_values(sys.stdout, after)
package mimi;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class WholeFileInputFormat extends FileInputFormat<BytesWritable, BytesWritable>
{
private static class WholeFileRecordReader implements RecordReader<BytesWritable, BytesWritable>
{
private FileSplit split;
private JobConf conf;
private boolean processed = false;
public WholeFileRecordReader(FileSplit split, JobConf conf)
{
this.split = split;
this.conf = conf;
}
@Override
public BytesWritable createKey()
{
return new BytesWritable();
}
@Override
public BytesWritable createValue()
{
return new BytesWritable();
}
@Override
public boolean next(BytesWritable key, BytesWritable value) throws IOException
{
if (processed)
{
return false;
}
byte[] contents = new byte[(int) split.getLength()];
Path file = split.getPath();
String name = file.getName();
byte[] bytes = name.getBytes(StandardCharsets.UTF_8);
key.set(bytes, 0, bytes.length);
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try
{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}
finally
{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
@Override
public float getProgress() throws IOException
{
return processed ? 1.0f : 0.0f;
}
@Override
public long getPos() throws IOException
{
return processed ? 0l : split.getLength();
}
@Override
public void close() throws IOException
{
// do nothing
}
}
@Override
protected boolean isSplitable(FileSystem fs, Path file)
{
return false;
}
@Override
public RecordReader<BytesWritable, BytesWritable> getRecordReader(InputSplit split,
JobConf conf,
Reporter reporter)
throws IOException
{
return new WholeFileRecordReader((FileSplit) split, conf);
}
}
package mimi;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
public class NamedFileOutputFormat extends MultipleOutputFormat<BytesWritable, BytesWritable>
{
private static class BytesValueWriter implements RecordWriter<BytesWritable, BytesWritable>
{
FSDataOutputStream out;
BytesValueWriter(FSDataOutputStream out)
{
this.out = out;
}
@Override
public synchronized void write(BytesWritable key, BytesWritable value) throws IOException
{
out.write(value.getBytes(), 0, value.getLength());
}
@Override
public void close(Reporter reporter) throws IOException
{
out.close();
}
}
@Override
protected String generateFileNameForKeyValue(BytesWritable key, BytesWritable value, String name)
{
return new String(key.getBytes(), 0, key.getLength(), StandardCharsets.UTF_8);
}
@Override
public RecordWriter<BytesWritable, BytesWritable> getBaseRecordWriter(FileSystem ignored,
JobConf conf,
String name,
Progressable progress)
throws IOException
{
Path file = FileOutputFormat.getTaskOutputPath(conf, name);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream out = fs.create(file, progress);
return new BytesValueWriter(out);
}
}