2014-02-09 62 views
0

我有一個場景,我想計算一個值的下降。Map減少計算下降率的代碼

我的輸入文件的格式爲一個CSV:鍵,值,時間戳

1,600,2014-01-20 10:20:00 
1,1200,2014-01-20 10:30:00 
... 
2,2400,2014-01-30 11:20:00 
2,3600,2014-01-30 11:30:00 
... 

可以有多個鍵和每個鍵可以有多個值和時間戳記錄它。

我需要計算每個鍵在一段時間內的值的下降。

Decline = (V2-V1)/(t2-t1) 

這裏,時間t以秒爲單位。

我的預期輸出是一樣的東西,

1,1 
... 
2,2 
... 

我寫的MR代碼看起來像這樣,

import java.io.IOException; 
import java.util.*; 
import java.text.SimpleDateFormat; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

public class TestMR 
{  
    public static class Map extends Mapper<LongWritable,Text,Text,Text> 
    { 
     public void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException 
     { 
      String [] split = line.toString().split(","); 

      long t1 = 0; 
      SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
      try 
      { 
       t1 = df.parse(split[2]).getTime()/1000; 
      } 
      catch (java.text.ParseException e) 
      { 
       System.out.println("Unable to parse date string: " + split[2]); 
      } 

      StringBuffer sb = new StringBuffer(split[1]+","+t1); 

      context.write(new Text(split[0]), new Text(sb.toString())); 
     }   
    } 


    public static class Reduce extends Reducer<Text,Text,Text,Text> 
    { 
     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
     { 
      Iterator iter = values.iterator(); 
      while(iter.hasNext()) 
      { 
       String [] tmpBuf_1 = iter.next().toString().split(","); 
       if(tmpBuf_1.length != 2) 
        continue; 
       String v1 = tmpBuf_1[0]; 
       double t1 = Double.parseDouble(tmpBuf_1[1]); 

       if(!iter.hasNext()) 
        break; 

       String [] tmpBuf_2 = iter.next().toString().split(",");  
       if(tmpBuf_2.length != 2) 
        continue; 
       String v2 = tmpBuf_2[0]; 
       double t2 = Double.parseDouble(tmpBuf_2[1]); 

       double vDiff = Double.parseDouble(v2) - Double.parseDouble(v1);  
       double tDiff = t2 - t1; 

       if(tDiff == 0) 
        continue; 

       double declineV = vDiff/tDiff; 

       context.write(key, new Text(String.valueOf(declineV))); 
      } 
     } 
    } 

    public static int main(String[] args) throws Exception 
    { 
     // Get the default configuration object 
     Configuration conf = new Configuration(); 

     // Add resources 
     conf.addResource("hdfs-default.xml"); 
     conf.addResource("hdfs-site.xml"); 
     conf.addResource("mapred-default.xml"); 
     conf.addResource("mapred-site.xml"); 
     conf.set("mapred.job.tracker", "local"); 

     Job job = new Job(conf); 
     job.setJobName("TestMR"); 
     job.setJarByClass(TestMR.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     job.setMapperClass(Map.class); 
     job.setCombinerClass(Reduce.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     TextInputFormat.setInputPaths(job, new Path(args[0])); 
     TextOutputFormat.setOutputPath(job, new Path(args[1])); 

     // Set the jar file to run 
     job.setJarByClass(Example.class); 

     // Submit the job 
     Date startTime = new Date(); 
     System.out.println("Job started: " + startTime);  
     int exitCode = job.waitForCompletion(true) ? 0 : 1; 

     if(exitCode == 0) 
     {    
      Date end_time = new Date(); 
      System.out.println("Job ended: " + end_time); 
      System.out.println("The job took " + (end_time.getTime() - startTime.getTime())/1000 + " seconds.");      
     } 
     else { 
      System.out.println("Job Failed!!!"); 
     } 

     return exitCode; 
    } 
} 

我沒有得到任何輸出,當我運行MR工作! 以下是命令跟蹤:

Job started: Sat Feb 08 16:36:07 PST 2014 
14/02/08 16:36:07 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id 
14/02/08 16:36:07 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 
14/02/08 16:36:07 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
14/02/08 16:36:07 INFO input.FileInputFormat: Total input paths to process : 1 
14/02/08 16:36:08 INFO mapred.JobClient: Running job: job_local2110196638_0001 
14/02/08 16:36:08 INFO mapred.LocalJobRunner: OutputCommitter set in config null 
14/02/08 16:36:08 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
14/02/08 16:36:08 INFO mapred.LocalJobRunner: Waiting for map tasks 
14/02/08 16:36:08 INFO mapred.LocalJobRunner: Starting task: attempt_local2110196638_0001_m_000000_0 
14/02/08 16:36:08 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 
14/02/08 16:36:08 INFO util.ProcessTree: setsid exited with exit code 0 
14/02/08 16:36:08 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
14/02/08 16:36:08 INFO mapred.MapTask: Processing split: hdfs://localhost.localdomain:8020/user/cloudera/input.csv:0+33554432 
14/02/08 16:36:08 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
14/02/08 16:36:08 INFO mapred.MapTask: io.sort.mb = 50 
14/02/08 16:36:08 INFO mapred.MapTask: data buffer = 39845888/49807360 
14/02/08 16:36:08 INFO mapred.MapTask: record buffer = 131072/163840 
14/02/08 16:36:09 INFO mapred.JobClient: map 0% reduce 0% 
In MAP!! 
245,1334603716 
14/02/08 16:36:14 INFO mapred.LocalJobRunner: 
14/02/08 16:36:15 INFO mapred.JobClient: map 9% reduce 0% 
14/02/08 16:36:16 INFO mapred.MapTask: Spilling map output: record full = true 
14/02/08 16:36:16 INFO mapred.MapTask: bufstart = 0; bufend = 2620494; bufvoid = 49807360 
14/02/08 16:36:16 INFO mapred.MapTask: kvstart = 0; kvend = 131072; length = 163840 
14/02/08 16:36:16 INFO compress.CodecPool: Got brand-new compressor [.snappy] 
In REDUCE!! 
14/02/08 16:36:17 INFO mapred.LocalJobRunner: 
14/02/08 16:36:17 INFO mapred.LocalJobRunner: 
14/02/08 16:36:17 INFO mapred.MapTask: Starting flush of map output 
14/02/08 16:36:18 INFO mapred.JobClient: map 49% reduce 0% 
14/02/08 16:36:18 INFO mapred.MapTask: Finished spill 0 
14/02/08 16:36:18 INFO mapred.MapTask: Finished spill 1 
14/02/08 16:36:18 INFO mapred.Merger: Merging 2 sorted segments 
14/02/08 16:36:18 INFO compress.CodecPool: Got brand-new decompressor [.snappy] 
14/02/08 16:36:18 INFO compress.CodecPool: Got brand-new decompressor [.snappy] 
14/02/08 16:36:18 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 339763 bytes 
14/02/08 16:36:19 INFO mapred.Task: Task:attempt_local2110196638_0001_m_000000_0 is done. And is in the process of commiting 
14/02/08 16:36:19 INFO mapred.LocalJobRunner: 
14/02/08 16:36:19 INFO mapred.Task: Task 'attempt_local2110196638_0001_m_000000_0' done. 
14/02/08 16:36:19 INFO mapred.LocalJobRunner: Finishing task: attempt_local2110196638_0001_m_000000_0 
14/02/08 16:36:19 INFO mapred.LocalJobRunner: Starting task: attempt_local2110196638_0001_m_000001_0 
14/02/08 16:36:19 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 
14/02/08 16:36:19 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
14/02/08 16:36:19 INFO mapred.MapTask: Processing split: hdfs://localhost.localdomain:8020/user/cloudera/input.csv:33554432+13261402 
14/02/08 16:36:19 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
14/02/08 16:36:19 INFO mapred.MapTask: io.sort.mb = 50 
14/02/08 16:36:19 INFO mapred.MapTask: data buffer = 39845888/49807360 
14/02/08 16:36:19 INFO mapred.MapTask: record buffer = 131072/163840 
14/02/08 16:36:20 INFO mapred.JobClient: map 50% reduce 0% 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: 
14/02/08 16:36:20 INFO mapred.MapTask: Starting flush of map output 
14/02/08 16:36:20 INFO mapred.MapTask: Finished spill 0 
14/02/08 16:36:20 INFO mapred.Task: Task:attempt_local2110196638_0001_m_000001_0 is done. And is in the process of commiting 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: 
14/02/08 16:36:20 INFO mapred.Task: Task 'attempt_local2110196638_0001_m_000001_0' done. 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: Finishing task: attempt_local2110196638_0001_m_000001_0 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: Map task executor complete. 
14/02/08 16:36:20 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 
14/02/08 16:36:20 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: 
14/02/08 16:36:20 INFO mapred.Merger: Merging 2 sorted segments 
14/02/08 16:36:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 496064 bytes 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: 
14/02/08 16:36:21 INFO mapred.Task: Task:attempt_local2110196638_0001_r_000000_0 is done. And is in the process of commiting 
14/02/08 16:36:21 INFO mapred.LocalJobRunner: 
14/02/08 16:36:21 INFO mapred.Task: Task attempt_local2110196638_0001_r_000000_0 is allowed to commit now 
14/02/08 16:36:21 INFO output.FileOutputCommitter: Saved output of task 'attempt_local2110196638_0001_r_000000_0' to /user/cloudera/output 
14/02/08 16:36:21 INFO mapred.LocalJobRunner: reduce > reduce 
14/02/08 16:36:21 INFO mapred.Task: Task 'attempt_local2110196638_0001_r_000000_0' done. 
14/02/08 16:36:21 INFO mapred.JobClient: map 100% reduce 100% 
14/02/08 16:36:21 INFO mapred.JobClient: Job complete: job_local2110196638_0001 
14/02/08 16:36:21 INFO mapred.JobClient: Counters: 25 
14/02/08 16:36:21 INFO mapred.JobClient: File System Counters 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of bytes read=1541573 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of bytes written=2668157 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of read operations=0 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of large read operations=0 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of write operations=0 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of bytes read=127382708 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of bytes written=0 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of read operations=17 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of large read operations=0 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of write operations=4 
14/02/08 16:36:21 INFO mapred.JobClient: Map-Reduce Framework 
14/02/08 16:36:21 INFO mapred.JobClient:  Map input records=419661 
14/02/08 16:36:21 INFO mapred.JobClient:  Map output records=202114 
14/02/08 16:36:21 INFO mapred.JobClient:  Map output bytes=4041067 
14/02/08 16:36:21 INFO mapred.JobClient:  Input split bytes=292 
14/02/08 16:36:21 INFO mapred.JobClient:  Combine input records=202114 
14/02/08 16:36:21 INFO mapred.JobClient:  Combine output records=95846 
14/02/08 16:36:21 INFO mapred.JobClient:  Reduce input groups=43 
14/02/08 16:36:21 INFO mapred.JobClient:  Reduce shuffle bytes=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Reduce input records=95846 
14/02/08 16:36:21 INFO mapred.JobClient:  Reduce output records=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Spilled Records=259510 
14/02/08 16:36:21 INFO mapred.JobClient:  CPU time spent (ms)=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Total committed heap usage (bytes)=376516608 
Job ended: Sat Feb 08 16:36:21 PST 2014 
The job took 13 seconds. 

一件事我能看到的是,在減少正在發生的地圖任務完成之前。

您是否認爲這可能會導致問題?

如果是,有沒有辦法說Reduce先等待地圖完成?

如果否,在上面的代碼中會出現什麼問題?

+0

減速器在映射器之前「開始」是正常的。這不是計算,而是從完成的映射器複製。 –

回答

0

(編輯:刪除之前不正確的解釋)

你申請Reduce中的組合和減速。組合器位工作,但輸出被反饋回相同的類,其中一切都沒有2列,因此每一行都被跳過。您不能將其作爲組合器應用。

此代碼還依賴按時間排序的事件看到事件,但沒有任何關於它如何構造似乎保證。

(你在這裏的一些小的怪異的東西,就像一個毫無意義的StringBuffer(這應該是StringBuilder反正),異常後繼續不正確,不進口ParseException和解析,並長期爲雙)