2013-02-27 39 views
3

我的代碼如下。這是級聯代碼。 它有8個工作。我不知道如何配置每一項工作。 下面的代碼配置8個作業togerher。 但我想要做的是讓上一份工作減少。 我想問如何recoginse這8個工作,以及如何配置他們seperatly? 謝謝。級聯:如何在配置中定義每個map-reduce作業?

private static void Demo(String[] args) { 
/*  Tap sourceTap = new Hfs(new TextLine(), "D:/test/file"); 
     Tap finalResultTap = new Hfs(new TextLine(), "D:/test/result", true); 
*/ 
     Tap sourceTap = new Hfs(new TextLine(), args[0], SinkMode.KEEP); 
     Tap finalResultTap = new Hfs(new TextLine(), args[1], SinkMode.REPLACE); 
     Tap trap = new Hfs(new TextLine(), args[2], SinkMode.REPLACE); 


     Pipe sourcePipe = new Pipe("sourcePipe"); 
     sourcePipe = getFilterPipe(sourcePipe); 

     Pipe vvResultPipe = new Pipe("vvResultPipe", sourcePipe); 
     vvResultPipe = getVVResultPipe(sourcePipe); 

     Pipe clickResultPipe = new Pipe("clickResultPipe", sourcePipe); 
     clickResultPipe = getClickResultPipe(clickResultPipe); 

     Pipe stClickResultPipe = new Pipe("stClickResultPipe", sourcePipe); 
     stClickResultPipe = getStClickResultPipe(sourcePipe); 

     //連接3個pipe的結果 
     Pipe resultPipe = new Pipe("resultPipe"); 
     resultPipe = new CoGroup(vvResultPipe, new Fields("vid"), clickResultPipe, new Fields("referVid"), 
       new Fields("vid", "totalVV", "referVid", "totalClick"), new LeftJoin()); 
     resultPipe = new CoGroup(resultPipe, new Fields("vid"), stClickResultPipe, new Fields("referVid"), 
       new Fields("vid", "totalVV", "referVid", "totalClick", "referVid2", "st1","st2","st3", "st4","st6", "st8"), new LeftJoin()); 
     resultPipe = new Each(resultPipe, new Fields("vid", "totalVV", "totalClick", "st1","st2","st3", "st4","st6", "st8"), 
       new Identity(Fields.ARGS)); 

     Fields sortClickFields = new Fields("totalVV"); 
     resultPipe = new GroupBy(resultPipe, Fields.NONE, sortClickFields); 
     sortClickFields.setComparators(Collections.reverseOrder()); 

/*  Limit limit = new Limit(200); 
     resultPipe = new Each(resultPipe, limit); 
*/ 

     JobConf conf = new JobConf(); 
     conf.setJarByClass(Main.class); 

     //Properties properties = new Properties(); 
     Properties properties = AppProps.appProps().buildProperties(conf); 
     properties.setProperty("user.group", "d_sdo_data"); 
     properties.setProperty("mapred.job.queue.name", "cug_d_sdo_data"); 
     properties.setProperty("mapred.fairscheduler.pool", "cug_d_sdo_data"); 
     properties.setProperty("cascading.tmp.dir", "/home/hdfs/cluster-data/tmp/mapred/staging/recommend_user/tmp"); 
     properties.setProperty("mapreduce.job.complete.cancel.delegation.tokens", "false"); 
     properties.setProperty("mapred.reduce.tasks", "30"); 
     properties.setProperty("mapred.map.tasks", "200"); 

     //AppProps.setApplicationJarClass(properties, Main.class); 
     FlowConnector flowConnector = new HadoopFlowConnector(properties); 
     FlowDef flowDef = FlowDef.flowDef() 
       .setName("tfidf") 
       .addSource(sourcePipe, sourceTap) 
       .addTailSink(resultPipe, finalResultTap) 
       .addTrap("assertions", trap); 
     Flow flow = flowConnector.connect(flowDef); 
     flow.complete(); 
    } 

回答

相關問題