3
我的代碼如下。這是級聯代碼。 它有8個工作。我不知道如何配置每一項工作。 下面的代碼配置8個作業togerher。 但我想要做的是讓上一份工作減少。 我想問如何recoginse這8個工作,以及如何配置他們seperatly? 謝謝。級聯:如何在配置中定義每個map-reduce作業?
private static void Demo(String[] args) {
/* Tap sourceTap = new Hfs(new TextLine(), "D:/test/file");
Tap finalResultTap = new Hfs(new TextLine(), "D:/test/result", true);
*/
Tap sourceTap = new Hfs(new TextLine(), args[0], SinkMode.KEEP);
Tap finalResultTap = new Hfs(new TextLine(), args[1], SinkMode.REPLACE);
Tap trap = new Hfs(new TextLine(), args[2], SinkMode.REPLACE);
Pipe sourcePipe = new Pipe("sourcePipe");
sourcePipe = getFilterPipe(sourcePipe);
Pipe vvResultPipe = new Pipe("vvResultPipe", sourcePipe);
vvResultPipe = getVVResultPipe(sourcePipe);
Pipe clickResultPipe = new Pipe("clickResultPipe", sourcePipe);
clickResultPipe = getClickResultPipe(clickResultPipe);
Pipe stClickResultPipe = new Pipe("stClickResultPipe", sourcePipe);
stClickResultPipe = getStClickResultPipe(sourcePipe);
//連接3個pipe的結果
Pipe resultPipe = new Pipe("resultPipe");
resultPipe = new CoGroup(vvResultPipe, new Fields("vid"), clickResultPipe, new Fields("referVid"),
new Fields("vid", "totalVV", "referVid", "totalClick"), new LeftJoin());
resultPipe = new CoGroup(resultPipe, new Fields("vid"), stClickResultPipe, new Fields("referVid"),
new Fields("vid", "totalVV", "referVid", "totalClick", "referVid2", "st1","st2","st3", "st4","st6", "st8"), new LeftJoin());
resultPipe = new Each(resultPipe, new Fields("vid", "totalVV", "totalClick", "st1","st2","st3", "st4","st6", "st8"),
new Identity(Fields.ARGS));
Fields sortClickFields = new Fields("totalVV");
resultPipe = new GroupBy(resultPipe, Fields.NONE, sortClickFields);
sortClickFields.setComparators(Collections.reverseOrder());
/* Limit limit = new Limit(200);
resultPipe = new Each(resultPipe, limit);
*/
JobConf conf = new JobConf();
conf.setJarByClass(Main.class);
//Properties properties = new Properties();
Properties properties = AppProps.appProps().buildProperties(conf);
properties.setProperty("user.group", "d_sdo_data");
properties.setProperty("mapred.job.queue.name", "cug_d_sdo_data");
properties.setProperty("mapred.fairscheduler.pool", "cug_d_sdo_data");
properties.setProperty("cascading.tmp.dir", "/home/hdfs/cluster-data/tmp/mapred/staging/recommend_user/tmp");
properties.setProperty("mapreduce.job.complete.cancel.delegation.tokens", "false");
properties.setProperty("mapred.reduce.tasks", "30");
properties.setProperty("mapred.map.tasks", "200");
//AppProps.setApplicationJarClass(properties, Main.class);
FlowConnector flowConnector = new HadoopFlowConnector(properties);
FlowDef flowDef = FlowDef.flowDef()
.setName("tfidf")
.addSource(sourcePipe, sourceTap)
.addTailSink(resultPipe, finalResultTap)
.addTrap("assertions", trap);
Flow flow = flowConnector.connect(flowDef);
flow.complete();
}