我想用自己的FileInputFormat
與自定義RecordReader
讀取csv數據到<Long><String>
對。Hadoop 2:使用自定義InputFormat時的空結果
所以我創建的類MyTextInputFormat
:
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class MyTextInputFormat extends FileInputFormat<Long, String> {
@Override
public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(input.toString());
return new MyStringRecordReader(job, (FileSplit)input);
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return super.isSplitable(fs, filename);
}
}
和類MyStringRecordReader
:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class MyStringRecordReader implements RecordReader<Long, String> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
System.out.println("constructor called");
}
@Override
public void close() throws IOException {
lineReader.close();
}
@Override
public Long createKey() {
return lineKey.get();
}
@Override
public String createValue() {
System.out.println("createValue called");
return lineValue.toString();
}
@Override
public long getPos() throws IOException {
return lineReader.getPos();
}
@Override
public float getProgress() throws IOException {
return lineReader.getProgress();
}
@Override
public boolean next(Long key, String value) throws IOException {
System.out.println("next called");
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key = lineKey.get();
value = lineValue.toString();
System.out.println(key);
System.out.println(value);
return true;
}
}
在我的星火應用程序,我通過調用sparkContext.hadoopFile
方法讀取文件。但我只從下面的代碼得到一個空的輸出:
public class AssociationRulesAnalysis {
@SuppressWarnings("serial")
public static void main(String[] args) {
JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
@Override
public String call(Tuple2<Long, String> arg0) throws Exception {
System.out.println("map: " + arg0._2());
return arg0._2();
}
});
List<String> asList = inputRdd.take(10);
for(String s : asList) {
System.out.println(s);
}
}
}
我只得到10條空線從RDD回來。
與所添加的prints
控制檯輸出看起來如下:
=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
Stopping...
(該RDD數據被打印低於=====
輸出(10個空行!!!)的=====
上面的輸出似乎是由由RDD.count
調用。在next
方法正確的鍵&值顯示!?我在做什麼錯?
我改變了「createKey」的內容和「createValue」方法,以上述「鍵= lineKey.get()」和行「值= lineValue.toString()」沒有成功,不幸的是仍然得到10空行。 –
在上述問題的代碼片段中更改了此項 –
createKey和createValue用於創建適當的鍵和值對象。這些API不會將文件中的數據值分配給鍵和值。這是「下一步」實施的責任。您的「下一步」實施將覆蓋密鑰和價值。嘗試在下一個方法中打印出鍵值並查看我的意思。 – Amit