2012-11-05 32 views
0

我在我的地圖函數中生成一個csv文件。這樣每個map任務都會生成一個csv文件。現在這是一個副作用,而不是映射器的輸出。我命名這些文件的方式就像是filename_inputkey。但是,當我在單個節點羣集上運行應用程序時,只會生成一個文件。我在輸入中有10行,根據我的理解,將會有10個映射器任務和10個文件將被生成。如果我在這裏以錯誤的方式思考,請告訴我。生成一個文件並在地圖函數中寫入它

這裏是我GWASInputFormat類

import java.io.IOException; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 

public class GWASInputFormat extends FileInputFormat<LongWritable, GWASGenotypeBean>{ 

@Override 
public RecordReader<LongWritable, GWASGenotypeBean> getRecordReader(org.apache.hadoop.mapred.InputSplit input, JobConf job, Reporter arg2) throws IOException { 
    return (RecordReader<LongWritable, GWASGenotypeBean>) new GWASRecordReader(job, (FileSplit)input); 
} 

} 

下面是GWASRecordReader

import java.io.IOException; 

import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.LineRecordReader; 
import org.apache.hadoop.mapred.RecordReader; 

public class GWASRecordReader implements RecordReader<LongWritable, GWASGenotypeBean>{ 

private LineRecordReader lineReader; 
private LongWritable lineKey; 
private Text lineValue; 

@Override 
public void close() throws IOException { 
    if(lineReader != null) { 
     lineReader.close(); 
    } 
} 

public GWASRecordReader(JobConf job, FileSplit split) throws IOException { 
    lineReader = new LineRecordReader(job, split); 
    lineKey = lineReader.createKey(); 
    lineValue = lineReader.createValue(); 
} 

@Override 
public LongWritable createKey() { 
    return new LongWritable(); 
} 

@Override 
public GWASGenotypeBean createValue() { 
    return new GWASGenotypeBean(); 
} 

@Override 
public long getPos() throws IOException { 
    return lineReader.getPos(); 
} 

@Override 
public boolean next(LongWritable key, GWASGenotypeBean value) throws IOException { 
    if(!lineReader.next(lineKey, lineValue)){ 
     return false; 
    } 

    String[] values = lineValue.toString().split(","); 

    if(values.length !=32) { 
     throw new IOException("Invalid Record "); 
    } 

    value.setPROJECT_NAME(values[0]); 
    value.setRESEARCH_CODE(values[1]); 
    value.setFACILITY_CODE(values[2]); 
    value.setPROJECT_CODE(values[3]); 
    value.setINVESTIGATOR(values[4]); 
    value.setPATIENT_NUMBER(values[5]); 
    value.setSAMPLE_COLLECTION_DATE(values[6]); 
    value.setGENE_NAME(values[7]); 
    value.setDbSNP_RefSNP_ID(values[8]); 
    value.setSNP_ID(values[9]); 
    value.setALT_SNP_ID(values[10]); 
    value.setSTRAND(values[11]); 
    value.setASSAY_PLATFORM(values[12]); 
    value.setSOFTWARE_NAME(values[13]); 
    value.setSOFTWARE_VERSION_NUMBER(values[14]); 
    value.setTEST_DATE(values[15]); 
    value.setPLATE_POSITION(values[16]); 
    value.setPLATE_ID(values[17]); 
    value.setOPERATOR(values[18]); 
    value.setGENOTYPE(values[19]); 
    value.setGENOTYPE_QS1_NAME(values[20]); 
    value.setGENOTYPE_QS2_NAME(values[21]); 
    value.setGENOTYPE_QS3_NAME(values[22]); 
    value.setGENOTYPE_QS4_NAME(values[23]); 
    value.setGENOTYPE_QS5_NAME(values[24]); 
    value.setGENOTYPE_QS1_RESULT(values[25]); 
    value.setGENOTYPE_QS2_RESULT(values[26]); 
    value.setGENOTYPE_QS3_RESULT(values[27]); 
    value.setGENOTYPE_QS4_RESULT(values[28]); 
    value.setGENOTYPE_QS5_RESULT(values[29]); 
    value.setSTAGE(values[30]); 
    value.setLAB(values[31]); 
    return true; 
} 

@Override 
public float getProgress() throws IOException { 
    return lineReader.getProgress(); 
} 

} 

映射類

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.Map; 
import java.util.TreeMap; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter; 

import com.google.common.base.Strings; 

public class GWASMapper extends MapReduceBase implements Mapper<LongWritable, GWASGenotypeBean, Text, Text> { 

private static Configuration conf; 

@SuppressWarnings("rawtypes") 
public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException { 
    conf = context.getConfiguration(); 
    // Path[] otherFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
} 


@Override 
public void map(LongWritable inputKey, GWASGenotypeBean inputValue, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 



    checkForNulls(inputValue, inputKey.toString()); 




    output.collect(new Text(inputValue.getPROJECT_CODE()), new Text(inputValue.getFACILITY_CODE())); 

} 

private void checkForNulls(GWASGenotypeBean user, String inputKey) { 

    String f1 = " does not have a value_fail"; 
    String p1 = "Must not contain NULLS for required fields"; 
    // have to initialize these two to some paths in hdfs 

    String edtChkRptDtl = "/user/hduser/output6/detail" + inputKey + ".csv"; 
    String edtChkRptSmry = "/user/hduser/output6/summary" + inputKey + ".csv"; 
      ../ 

      List<String> errSmry = new ArrayList<String>(); 
    Map<String, String> loc = new TreeMap<String, String>(); 

    if(Strings.isNullOrEmpty(user.getPROJECT_NAME())) { 
     loc.put("test", "PROJECT_NAME "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getRESEARCH_CODE())) { 
     loc.put("test", "RESEARCH_CODE "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getFACILITY_CODE())) { 
     loc.put("test", "FACILITY_CODE "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getPROJECT_CODE())) { 
     loc.put("test", "PROJECT_CODE "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getINVESTIGATOR())) { 
     loc.put("test", "INVESTIGATOR "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getPATIENT_NUMBER())) { 
     loc.put("test", "PATIENT_NUMBER "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getSAMPLE_COLLECTION_DATE())) { 
     loc.put("test", "SAMPLE_COLLECTION_DATE "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getGENE_NAME())) { 
     loc.put("test", "GENE_NAME "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getSTRAND())) { 
     loc.put("test", "STRAND "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getASSAY_PLATFORM())) { 
     loc.put("test", "ASSAY_PLATFORM "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getSOFTWARE_NAME())) { 
     loc.put("test", "SOFTWARE_NAME "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getTEST_DATE())) { 
     loc.put("test", "TEST_DATE "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getPLATE_POSITION())) { 
     loc.put("test", "PLATE_POSITION "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getPLATE_ID())) { 
     loc.put("test", "PLATE_ID "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getOPERATOR())) { 
     loc.put("test", "OPERATOR "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getGENOTYPE())) { 
     loc.put("test", "GENOTYPE "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getSTAGE())) { 
     loc.put("test", "STAGE "); 
     errSmry.add("_fail"); 
    } else if(Strings.isNullOrEmpty(user.getLAB())) { 
     loc.put("test", "LAB "); 
     errSmry.add("_fail"); 
    } 

    String customNullMsg = "Required Genotype column(s)"; 
    List<String> error = new ArrayList<String>(); 
    String message = null; 

    if(!loc.isEmpty()) { 
     for (Map.Entry<String, String> entry : loc.entrySet()) { 
     message = "line:" + entry.getKey() + " column:" + entry.getValue() + " " + f1; 
     error.add(message); 
     } 
    } else { 
     message = "_pass"; 
     error.add(message); 
    } 

    int cnt = 0; 
    if(!errSmry.isEmpty()) { 

     // not able to understand this. Are we trying to get the occurances 
     // if the last key that contains _fail 
     for (String key : errSmry) { 
     if(key.contains("_fail")) { 
      cnt = Collections.frequency(errSmry, key); 
      // ******************** Nikhil added this 
      break; 
     } 
     } 

     if(cnt > 0) { 
     writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "failed", Integer.toString(cnt)); 
     } else { 
     writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "passed", "0"); 
     } 

    } else { 
     writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "passed", "0"); 
    } 

    // loop the list and write out items to the error report file 
    if(!error.isEmpty()) { 
     for (String s : error) { 
     //System.out.println(s); 
     if(s.contains("_fail")) { 
      String updatedFailmsg = s.replace("_fail", ""); 
      writeCsvFileDtl(edtChkRptDtl, "genotype", updatedFailmsg, "failed"); 
     } 
     if(s.contains("_pass")) { 
      writeCsvFileDtl(edtChkRptDtl, "genotype", p1, "passed"); 
     } 
     } 
    } else { 
     writeCsvFileDtl(edtChkRptDtl, "genotype", p1, "passed"); 
    } 
    // end loop 
    } 

private void writeCsvFileDtl(String edtChkRptDtl, String col1, String col2, String col3) { 
    try { 
     if(conf == null) { 
      conf = new Configuration(); 
     } 
     FileSystem fs = FileSystem.get(conf); 

     Path path = new Path(edtChkRptDtl); 
     if (!fs.exists(path)) { 
      FSDataOutputStream out = fs.create(path); 
      out.writeChars(col1); 
      out.writeChar(','); 
      out.writeChars(col2); 
      out.writeChar(','); 
      out.writeChars(col3); 
      out.writeChar('\n'); 
      out.flush(); 
      out.close(); 
     } 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

private void writeCsvFileSmry(String edtChkRptSmry, String col1, String col2, String col3, String col4) { 
    try { 


     if(conf == null) { 
      conf = new Configuration(); 
     } 
     FileSystem fs = FileSystem.get(conf); 

     Path path = new Path(edtChkRptSmry); 
     if (!fs.exists(path)) { 
      FSDataOutputStream out = fs.create(path); 
      out.writeChars(col1); 
      out.writeChar(','); 
      out.writeChars(col2); 
      out.writeChar(','); 
      out.writeChars(col3); 
      out.writeChar(','); 
      out.writeChars(col4); 
      out.writeChar('\n'); 
      out.flush(); 
      out.close(); 
     } 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 
} 

這裏是我的驅動程序類

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

public class GWASMapReduce extends Configured implements Tool{ 

/** 
* @param args 
*/ 
public static void main(String[] args) throws Exception { 
    Configuration configuration = new Configuration(); 
    ToolRunner.run(configuration, new GWASMapReduce(), args); 
} 

@Override 
public int run(String[] arg0) throws Exception { 

    JobConf conf = new JobConf(new Configuration()); 
    conf.setInputFormat(GWASInputFormat.class); 
    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(Text.class); 
    conf.setJarByClass(GWASMapReduce.class); 
    conf.setMapperClass(GWASMapper.class); 
    conf.setNumReduceTasks(0); 
    FileInputFormat.addInputPath(conf, new Path(arg0[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(arg0[1])); 
    JobClient.runJob(conf); 
    return 0; 
} 
} 

回答

1

可能只有一個Mapper任務,並且它的012個調用方法是map。如果您希望每個映射器寫出一個文件,您應該在其configure方法中這樣做。如果你想爲每個輸入記錄寫出一個文件,你應該在其map方法中寫出。

編輯:上述結果證明是無關的問題。問題是,在GWASInputFormat中,您不會在下一個方法中設置密鑰,所以您的地圖輸入密鑰始終相同。只需將key.set(lineKey.get());添加到next方法,它應該工作。

+0

Mapper只獲取一個輸入記錄。所以我不必在映射器中做任何特殊的事情來爲每條記錄生成一個文件? – user1707141

+0

你確定Mapper只能得到一個輸入記錄嗎?這不是MapReduce正常工作的方式,您必須在GWASInputFormat中專門執行此操作。每個輸入記錄都不會有一個地圖任務;每個輸入分割都會得到一個地圖任務。每個輸入分割可以有很多記錄。但是,如果您想爲每個記錄創建一個文件,則可以直接在'map'方法中執行。如果您正在執行此操作並仍然只看到一個文件,請確保您使用的是不同的文件名,而不覆蓋以前的輸出。 –

+0

是的,我已經用GWASInputFormat更新了這個問題。所以每個分組只包含一條記錄。另外我也在映射器中使用了不同的文件名。我把我的映射類也放在問題中。 – user1707141

相關問題