2016-06-12 26 views
0

我們編寫了一個mapreduce作業來處理日誌文件。截至目前,我們擁有大約52GB的輸入文件,但處理數據需要大約一個小時。默認情況下,它只創建一個Reducer作業。通常,我們會在Reduce任務中看到超時錯誤,然後重新啓動並完成。以下是成功完成這項工作的統計數據。請讓我們知道如何提高性能。Mapreduce作業 - 耗時過長,無法完成

File System Counters 
      FILE: Number of bytes read=876100387 
      FILE: Number of bytes written=1767603407 
      FILE: Number of read operations=0 
      FILE: Number of large read operations=0 
      FILE: Number of write operations=0 
      HDFS: Number of bytes read=52222279591 
      HDFS: Number of bytes written=707429882 
      HDFS: Number of read operations=351 
      HDFS: Number of large read operations=0 
      HDFS: Number of write operations=2 
    Job Counters 
      Failed reduce tasks=1 
      Launched map tasks=116 
      Launched reduce tasks=2 
      Other local map tasks=116 
      Total time spent by all maps in occupied slots (ms)=9118125 
      Total time spent by all reduces in occupied slots (ms)=7083783 
      Total time spent by all map tasks (ms)=3039375 
      Total time spent by all reduce tasks (ms)=2361261 
      Total vcore-seconds taken by all map tasks=3039375 
      Total vcore-seconds taken by all reduce tasks=2361261 
      Total megabyte-seconds taken by all map tasks=25676640000 
      Total megabyte-seconds taken by all reduce tasks=20552415744 
    Map-Reduce Framework 
      Map input records=49452982 
      Map output records=5730971 
      Map output bytes=864140911 
      Map output materialized bytes=876101077 
      Input split bytes=13922 
      Combine input records=0 
      Combine output records=0 
      Reduce input groups=1082133 
      Reduce shuffle bytes=876101077 
      Reduce input records=5730971 
      Reduce output records=5730971 
      Spilled Records=11461942 
      Shuffled Maps =116 
      Failed Shuffles=0 
      Merged Map outputs=116 
      GC time elapsed (ms)=190633 
      CPU time spent (ms)=4536110 
      Physical memory (bytes) snapshot=340458307584 
      Virtual memory (bytes) snapshot=1082745069568 
      Total committed heap usage (bytes)=378565820416 
    Shuffle Errors 
      BAD_ID=0 
      CONNECTION=0 
      IO_ERROR=0 
      WRONG_LENGTH=0 
      WRONG_MAP=0 
      WRONG_REDUCE=0 
    File Input Format Counters 
      Bytes Read=52222265669 
    File Output Format Counters 
      Bytes Written=707429882 

如果增加減速器的數量,我會得到如下的classcast異常。我想這個問題來自分區類。

java.lang.Exception: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text 
    at com.emaar.bigdata.exchg.logs.ActualKeyPartitioner.getPartition(ActualKeyPartitioner.java:1) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:716) 
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) 
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:56) 
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:1) 
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 

我的分區程序類

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 

public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> { 

    HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>(); 
    Text newKey = new Text(); 

    @Override 
    public int getPartition(CompositeKey key, Text value, int numReduceTasks) { 

     try { 
      // Execute the default partitioner over the first part of the key 
      newKey.set(key.getSubject()); 
      return hashPartitioner.getPartition(newKey, value, numReduceTasks); 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return (int) (Math.random() * numReduceTasks); // this would return 
                  // a random value in 
                  // the range 
      // [0,numReduceTasks) 
     } 
    } 
} 

映射代碼

public class ExchgLogsMapper extends Mapper<LongWritable, List<Text>, CompositeKey, Writable> { 
    String recepientAddresses = ""; 
    public static final String DELIVER = "DELIVER"; 
    public static final String RESOLVED = "Resolved"; 
    public static final String JUNK = "Junk E-mail"; 
    public static final String SEMICOLON = ";"; 
    public static final String FW1 = "FW: "; 
    public static final String FW2 = "Fw: "; 
    public static final String FW3 = "FWD: "; 
    public static final String FW4 = "Fwd: "; 
    public static final String FW5 = "fwd: "; 
    public static final String RE1 = "RE: "; 
    public static final String RE2 = "Re: "; 
    public static final String RE3 = "re: "; 


    Text mailType = new Text("NEW"); 
    Text fwType = new Text("FW"); 
    Text reType = new Text("RE"); 
    Text recepientAddr = new Text(); 

    @Override 
    public void map(LongWritable key, List<Text> values, Context context) throws IOException, InterruptedException { 
     String subj = null; 
     int lstSize=values.size() ; 
     if ((lstSize >= 26)) { 
      if (values.get(8).toString().equals(DELIVER)) { 
       if (!(ExclusionList.exclusions.contains(values.get(18).toString()))) { 
        if (!(JUNK.equals((values.get(12).toString())))) { 
         subj = values.get(17).toString(); 
         recepientAddresses = values.get(11).toString(); 
         String[] recepientAddressArr = recepientAddresses.split(SEMICOLON); 
         if (subj.startsWith(FW1) || subj.startsWith(FW2) || subj.startsWith(FW3) 
           || subj.startsWith(FW4) || subj.startsWith(FW5)) { 
          mailType = fwType; 
          subj = subj.substring(4); 
         } else if (subj.startsWith(RE1) || subj.startsWith(RE2) || subj.startsWith(RE3)) { 
          mailType = reType; 
          subj = subj.substring(4); 
         } 
         for (int i = 0; i < recepientAddressArr.length; i++) { 
          CompositeKey ckey = new CompositeKey(subj, values.get(0).toString()); 
          recepientAddr.set(recepientAddressArr[i]); 
          CompositeWritable out = new CompositeWritable(mailType, recepientAddr, values.get(18), 
            values.get(0)); 
          context.write(ckey, out); 
//       System.err.println(out); 

         } 
        } 
       } 
      } 
     } 
+0

什麼阻止你增加reducer的數量? –

+0

我得到一個classcast異常,就好像我增加了reducer的數量。我想這個問題來自分區類。我已經編輯了更多細節問題 –

+0

是key.getSubject文本? 你可以試試 return(key.getSubject.hashCode()&Integer.MAX_VALUE)%numReduceTasks; 而不是創建一個哈希分區實例。這是來自HashPartitioner實現的代碼。 –

回答

1

有在其中寫入大量日誌的循環內的減速碼數sysouts和刪除它們後減速機會在幾分鐘內完成。