2016-06-27 31 views
0

我想用自己的FileInputFormat與自定義RecordReader讀取csv數據到<Long><String>對。Hadoop 2:使用自定義InputFormat時的空結果

所以我創建的類MyTextInputFormat

import java.io.IOException; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 

public class MyTextInputFormat extends FileInputFormat<Long, String> { 

    @Override 
    public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException { 
     reporter.setStatus(input.toString()); 
     return new MyStringRecordReader(job, (FileSplit)input); 
    } 

    @Override 
    protected boolean isSplitable(FileSystem fs, Path filename) { 
    return super.isSplitable(fs, filename); 
    } 
} 

和類MyStringRecordReader

import java.io.IOException; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.LineRecordReader; 
import org.apache.hadoop.mapred.RecordReader; 

public class MyStringRecordReader implements RecordReader<Long, String> { 

    private LineRecordReader lineReader; 
    private LongWritable lineKey; 
    private Text lineValue; 

    public MyStringRecordReader(JobConf job, FileSplit split) throws IOException { 
     lineReader = new LineRecordReader(job, split); 

     lineKey = lineReader.createKey(); 
     lineValue = lineReader.createValue(); 

     System.out.println("constructor called"); 
    } 

    @Override 
    public void close() throws IOException { 
     lineReader.close(); 
    } 

    @Override 
    public Long createKey() { 
     return lineKey.get(); 
    } 

    @Override 
    public String createValue() { 
     System.out.println("createValue called"); 
     return lineValue.toString(); 
    } 

    @Override 
    public long getPos() throws IOException { 
     return lineReader.getPos(); 
    } 

    @Override 
    public float getProgress() throws IOException { 
     return lineReader.getProgress(); 
    } 

    @Override 
    public boolean next(Long key, String value) throws IOException { 
     System.out.println("next called"); 

     // get the next line 
     if (!lineReader.next(lineKey, lineValue)) { 
      return false; 
     } 

     key = lineKey.get(); 
     value = lineValue.toString(); 

     System.out.println(key); 
     System.out.println(value); 


     return true; 
    } 
} 

在我的星火應用程序,我通過調用sparkContext.hadoopFile方法讀取文件。但我只從下面的代碼得到一個空的輸出

public class AssociationRulesAnalysis { 

    @SuppressWarnings("serial") 
    public static void main(String[] args) { 
     JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() { 
      @Override 
      public String call(Tuple2<Long, String> arg0) throws Exception { 
       System.out.println("map: " + arg0._2()); 
       return arg0._2(); 
      } 
     }); 

     List<String> asList = inputRdd.take(10); 
     for(String s : asList) { 
      System.out.println(s); 
     } 
    } 
} 

我只得到10條空線從RDD回來。

與所添加的prints控制檯輸出看起來如下:

=== APP STARTED : local-1467182320798 
constructor called 
createValue called 
next called 
0 
ä1 
map: 
next called 
8 
ö2 
map: 
next called 
13 
ü3 
map: 
next called 
18 
ß4 
map: 
next called 
23 
ä5 
map: 
next called 
28 
ö6 
map: 
next called 
33 
ü7 
map: 
next called 
38 
ß8 
map: 
next called 
43 
ä9 
map: 
next called 
48 
ü10 
map: 
next called 
54 
ä11 
map: 
next called 
60 
ß12 
map: 
next called 
12 
===================== 
constructor called 
createValue called 
next called 
0 
ä1 
map: 
next called 
8 
ö2 
map: 
next called 
13 
ü3 
map: 
next called 
18 
ß4 
map: 
next called 
23 
ä5 
map: 
next called 
28 
ö6 
map: 
next called 
33 
ü7 
map: 
next called 
38 
ß8 
map: 
next called 
43 
ä9 
map: 
next called 
48 
ü10 
map: 










Stopping... 

(該RDD數據被打印低於=====輸出(10個空行!!!)的=====上面的輸出似乎是由由RDD.count調用。在next方法正確的鍵&值顯示!?我在做什麼錯?

回答

0

lineKeylineValue永遠不會初始化爲keyvalue傳入您的MyStringRecordReader中的覆蓋方法next方法。因此,當您嘗試使用您的RecordReader時,它總是顯示EMPTY字符串。 如果您需要文件中記錄的其他鍵和值,則需要使用傳遞給next方法的鍵和值,並使用計算出的鍵和值對它們進行初始化。如果你不打算改變鍵/值記錄,那麼擺脫以下內容。每次執行這段代碼時,都會覆蓋使用EMPTY字符串和0L從文件中讀取的鍵/值。

key = lineKey.get(); 
value = lineValue.toString(); 
+0

我改變了「createKey」的內容和「createValue」方法,以上述「鍵= lineKey.get()」和行「值= lineValue.toString()」沒有成功,不幸的是仍然得到10空行。 –

+0

在上述問題的代碼片段中更改了此項 –

+0

createKey和createValue用於創建適當的鍵和值對象。這些API不會將文件中的數據值分配給鍵和值。這是「下一步」實施的責任。您的「下一步」實施將覆蓋密鑰和價值。嘗試在下一個方法中打印出鍵值並查看我的意思。 – Amit