2014-02-07 73 views
0

Hadoop的DistributedCache文檔似乎並未充分描述如何使用分佈式緩存。這裏給出的例子:將Hadoop DistributedCache與存檔結合使用

// Setting up the cache for the application 

1. Copy the requisite files to the FileSystem: 

$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat 
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip 
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar 
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar 
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz 
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz 

2. Setup the application's JobConf: 

JobConf job = new JobConf(); 
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
           job); 
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); 
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); 

3. Use the cached files in the Mapper 
or Reducer: 

public static class MapClass extends MapReduceBase 
implements Mapper<K, V, K, V> { 

    private Path[] localArchives; 
    private Path[] localFiles; 

    public void configure(JobConf job) { 
    // Get the cached archives/files 
    File f = new File("./map.zip/some/file/in/zip.txt"); 
    } 

    public void map(K key, V value, 
        OutputCollector<K, V> output, Reporter reporter) 
    throws IOException { 
    // Use data from the cached archives/files here 
    // ... 
    // ... 
    output.collect(k, v); 
    } 
} 

我一直在尋找一個小時試圖找出如何使用這個。拼湊幾個其他SO問題在一起後,這裏就是我想出了:

public static void main(String[] args) throws Exception { 
    Job job = new Job(new JobConf(), "Job Name"); 
    JobConf conf = job.getConfiguration(); 
    DistributedCache.createSymlink(conf); 
    DistributedCache.addCacheArchive(new URI("/ProjectDir/LookupTable.zip", job); 
    // *Rest of configuration code* 
} 

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> 
{ 
    private Path[] localArchives; 

    public void configure(JobConf job) 
    { 
     // Get the cached archive 
     File file1 = new File("./LookupTable.zip/file1.dat"); 
     BufferedReader br1index = new BufferedReader(new FileInputStream(file1)); 
    } 

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
    { // *Map code* } 
} 
  • 我應該在哪裏調用void configure(JobConf job)功能?
  • 我在哪裏可以使用private Path[] localArchives對象?
  • 我的configure()函數中的代碼是否正確地訪問檔案中的文件並將文件鏈接到BufferedReader?

回答

1

我會回答你的問題w.r.t新的API,並使用共同的規範,分佈式緩存

  • 我應該在哪裏調用無效配置(JobConf工作)功能?

框架將調用保護無效的設置(上下文的背景下),在每一個地圖的任務開始方法一次,使用緩存文件通常這裏處理相關的邏輯。例如,讀取文件,並存儲在變量數據在地圖中()函數可以使用被稱爲設置後()

  • 我在哪裏使用專用路徑[] localArchives對象?

它將通常用於setup()方法來檢索緩存文件的路徑。像這樣的東西。

Path[] localArchive =DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
  • 是我在配置()函數來訪問存檔內 文件,使文件和一個BufferedReader鏈接的正確方法的代碼?

它缺少一個調用方法來檢索存儲緩存文件的路徑(如上所示)。一旦路徑被檢索到,文件可以被讀取如下。

FSDataInputStream in = fs.open(localArchive); 
BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
+0

'''setup'和'configure'函數是否在Mapper類中? 「JobConf作業」將超出範圍。另外,我收到了'DistributedCache'和'URI'的未定義符號錯誤。任何想法我失蹤?感謝您所有的幫助! – LeonardBlunderbuss

+0

安裝程序來自新API並在映射程序類中使用。我建議您在此處開始使用新API http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Mapper.html – rVr