從下面的代碼我不明白兩兩件事:不瞭解路徑分佈路徑
DistributedCache.addcachefile(new URI ('/abc.dat'), job.getconfiguration())
我不明白URI路徑必須存在於HDFS。糾正我,如果我錯了。
什麼是
p.getname().equals()
從下面的代碼:public class MyDC { public static class MyMapper extends Mapper < LongWritable, Text, Text, Text > { private Map < String, String > abMap = new HashMap < String, String >(); private Text outputKey = new Text(); private Text outputValue = new Text(); protected void setup(Context context) throws java.io.IOException, InterruptedException { Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration()); for (Path p: files) { if (p.getName().equals("abc.dat")) { BufferedReader reader = new BufferedReader(new FileReader(p.toString())); String line = reader.readLine(); while (line != null) { String[] tokens = line.split("\t"); String ab = tokens[0]; String state = tokens[1]; abMap.put(ab, state); line = reader.readLine(); } } } if (abMap.isEmpty()) { throw new IOException("Unable to load Abbrevation data."); } } protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String row = value.toString(); String[] tokens = row.split("\t"); String inab = tokens[0]; String state = abMap.get(inab); outputKey.set(state); outputValue.set(row); context.write(outputKey, outputValue); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = new Job(); job.setJarByClass(MyDC.class); job.setJobName("DCTest"); job.setNumReduceTasks(0); try { DistributedCache.addCacheFile(new URI("/abc.dat"), job.getConfiguration()); } catch (Exception e) { System.out.println(e); } job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
謝謝你!這非常有幫助 – Sri