2016-05-16 104 views
1

從下面的代碼我不明白兩兩件事:不瞭解路徑分佈路徑

  1. DistributedCache.addcachefile(new URI ('/abc.dat'), job.getconfiguration())

我不明白URI路徑必須存在於HDFS。糾正我,如果我錯了。

  • 什麼是p.getname().equals()從下面的代碼:

    public class MyDC { 
    
    public static class MyMapper extends Mapper < LongWritable, Text, Text, Text > { 
    
        private Map < String, String > abMap = new HashMap < String, String >(); 
    
        private Text outputKey = new Text(); 
    
        private Text outputValue = new Text(); 
    
        protected void setup(Context context) throws 
        java.io.IOException, InterruptedException { 
    
         Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
    
         for (Path p: files) { 
    
          if (p.getName().equals("abc.dat")) { 
    
           BufferedReader reader = new BufferedReader(new FileReader(p.toString())); 
    
           String line = reader.readLine(); 
    
           while (line != null) { 
    
            String[] tokens = line.split("\t"); 
    
            String ab = tokens[0]; 
    
            String state = tokens[1]; 
    
            abMap.put(ab, state); 
    
            line = reader.readLine(); 
    
           } 
    
          } 
    
         } 
    
         if (abMap.isEmpty()) { 
    
          throw new IOException("Unable to load Abbrevation data."); 
    
         } 
    
        } 
    
        protected void map(LongWritable key, Text value, Context context) 
        throws java.io.IOException, InterruptedException { 
    
         String row = value.toString(); 
    
         String[] tokens = row.split("\t"); 
    
         String inab = tokens[0]; 
    
         String state = abMap.get(inab); 
    
         outputKey.set(state); 
    
         outputValue.set(row); 
    
         context.write(outputKey, outputValue); 
    
        } 
    
    } 
    
    public static void main(String[] args) 
    throws IOException, ClassNotFoundException, InterruptedException { 
    
        Job job = new Job(); 
    
        job.setJarByClass(MyDC.class); 
    
        job.setJobName("DCTest"); 
    
        job.setNumReduceTasks(0); 
    
        try { 
    
         DistributedCache.addCacheFile(new URI("/abc.dat"), job.getConfiguration()); 
    
        } catch (Exception e) { 
    
         System.out.println(e); 
    
        } 
    
        job.setMapperClass(MyMapper.class); 
    
        job.setMapOutputKeyClass(Text.class); 
    
        job.setMapOutputValueClass(Text.class); 
    
    
        FileInputFormat.addInputPath(job, new Path(args[0])); 
    
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    
        job.waitForCompletion(true); 
    
    } 
    
    } 
    
  • 回答

    0

    DistributedCache是​​用於添加一個文件或一組在存儲器中的文件的,將可爲每一個API數據節點是否可以使用map-reduce。使用分佈式緩存的一個例子是地圖邊連接。

    DistributedCache.addcachefile(new URI('/abc.dat'),job.getconfiguration())將在緩存區域中添加abc.dat文件。緩存中可以有n個文件,p.getName()。equals(「abc.dat」))會檢查你需要的文件。 HDFS中的每條路徑都將在Path []下進行map-reduce處理。例如:

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    

    第一路徑(參數[0])是第一個參數 (輸入文件位置)傳遞而罐執行與路徑(參數[1])是第二個參數,其輸出文件位置。一切都被視爲Path數組。

    以同樣的方式,當你添加任何文件緩存時,它將被排列在Path數組中,你不能使用下面的代碼來檢索它。

    Path [] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());

    它將返回緩存中的所有文件,並且您將通過p.getName()。equals()方法獲取文件名。

    +0

    謝謝你!這非常有幫助 – Sri

    1

    分佈式緩存的想法是做一些靜態的數據之前提供給任務節點它開始執行。

    文件已存在於HDFS,以便它可以將其添加到分佈式緩存(每個任務節點)

    DistributedCache.getLocalCacheFile基本上得到所有存在於任務節點的緩存文件。通過if (p.getName().equals("abc.dat")) {您正在獲取適當的緩存文件以供您的應用程序處理。

    請參考文檔如下:

    https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#DistributedCache

    https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/filecache/DistributedCache.html#getLocalCacheFiles(org.apache.hadoop.conf.Configuration)

    +0

    感謝您的精彩迴應!我清楚明白了! – Sri