我試圖創建一個InputFormat,它只是在不從外部位置讀取的情況下生成數據。它從配置中讀取關閉之前要生成多少數據。這是爲了幫助在非測試環境中分析OutputFormat。不幸的是,我找不到任何有關使用本質上的生成器InputFormat的參考。創建一個沒有輸入數據的自定義生成器Hadoop InputFormat
的InputFormat我到目前爲止是:從0
public static class GeneratorInputFormat extends InputFormat<LongWritable, LongWritable> {
@Override
public RecordReader<LongWritable, LongWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
return new GeneratorRecordReader();
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
long splitCount = job.getConfiguration().getLong(SPLITS_COUNT_KEY, 0);
long splitSize = job.getConfiguration().getLong(SPLITS_SIZE_KEY, 0);
List<InputSplit> splits = new ArrayList<InputSplit>();
for (int i = 0; i < splitCount; i++) {
splits.add(new TestInputSplit(splitSize));
}
return splits;
}
}
public static class TestInputSplit extends InputSplit {
private final long size;
public TestInputSplit(long size) {
this.size = size;
}
@Override
public long getLength() throws IOException, InterruptedException {
return size;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0];
}
}
記錄讀取器簡單毛刺編號,輸入長度。
我得到的錯誤是缺少文件例外:
16/11/18 03:28:54 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/root/.staging/job_1479265882561_0037
Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(JobSplitWriter.java:132)
at org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles(JobSplitWriter.java:79)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:307)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
at com.gmail.mooman219.cloud.hadoop.WordCountBench.main(WordCountBench.java:208)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
16/11/18 03:28:54 WARN hdfs.DFSClient: DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/hadoop-yarn/staging/root/.staging/job_1479265882561_0037/job.split (inode 34186): File does $
ot exist. Holder DFSClient_NONMAPREDUCE_232487306_1 does not have any open files.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3430)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3233)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3071)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
我覺得這很奇怪,因爲在任何時候,我會引用在輸入端的任何文件。
首先沒有定義文件,它不應該從任何地方讀取。 –