2014-01-20 21 views
33

我的地圖任務需要一些配置數據,我希望通過分佈式緩存進行分發。Hadoop DistributedCache已棄用 - 首選API是什麼?

Hadoop的MapReduce Tutorial顯示DistributedCache類的usage,大致如下:

// In the driver 
JobConf conf = new JobConf(getConf(), WordCount.class); 
... 
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper 
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job); 
... 

然而,DistributedCache是Hadoop中2.2.0 marked as deprecated

什麼是實現此目的的首選新方法?是否有涵蓋此API的最新示例或教程?

回答

46

分佈式緩存的API可以在Job類本身中找到。這裏檢查文檔:http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html 代碼應該是這樣的

Job job = new Job(); 
... 
job.addCacheFile(new Path(filename).toUri()); 

在你的映射器代碼:

Path[] localPaths = context.getLocalCacheFiles(); 
... 
+0

謝謝 - 我假定我需要使用更新的'mapreduce' API而不是'mapred',否則'JobContext'對象不會提供給映射器。 – DNA

+0

是的,你是對的。 – user2371156

+10

我認爲'getLocalCacheFiles()'被棄用,但'getCacheFiles()'確定 - 雖然返回的URI不是路徑。 – DNA

5

新DistributedCache API紗線/ MR2在org.apache.hadoop.mapreduce.Job類中。

Job.addCacheFile() 

不幸的是,目前還沒有很多全面的教程式的例子。

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Job.html#addCacheFile%28java.net.URI%29

+0

我不知道如何檢索這些使用'Job.addCacheFile(URI)'添加的緩存文件。它不適合我使用舊的方式('context.getCacheFiles()'),因爲這些文件是空的。 – tolgap

15

要擴大@jtravaglini,使用DistributedCache紗線的首選方式/ MapReduce的2如下:

在你的驅動程序,使用Job.addCacheFile()

public int run(String[] args) throws Exception { 
    Configuration conf = getConf(); 

    Job job = Job.getInstance(conf, "MyJob"); 

    job.setMapperClass(MyMapper.class); 

    // ... 

    // Mind the # sign after the absolute file location. 
    // You will be using the name after the # sign as your 
    // file name in your Mapper/Reducer 
    job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some")); 
    job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other")); 

    return job.waitForCompletion(true) ? 0 : 1; 
} 

而且在您的Mapper/Reducer,覆蓋setup(Context context)方法:

@Override 
protected void setup(
     Mapper<LongWritable, Text, Text, Text>.Context context) 
     throws IOException, InterruptedException { 
    if (context.getCacheFiles() != null 
      && context.getCacheFiles().length > 0) { 

     File some_file = new File("./some"); 
     File other_file = new File("./other"); 

     // Do things to these two files, like read them 
     // or parse as JSON or whatever. 
    } 
    super.setup(context); 
} 
0

我有同樣的問題。 DistributedCach不僅是棄用的,而且也是getLocalCacheFiles和「new Job」。那麼,什麼工作對我來說是這樣的:

司機:

Configuration conf = getConf(); 
Job job = Job.getInstance(conf); 
... 
job.addCacheFile(new Path(filename).toUri()); 

在映射/減速設置:

@Override 
protected void setup(Context context) throws IOException, InterruptedException 
{ 
    super.setup(context); 

    URI[] files = context.getCacheFiles(); // getCacheFiles returns null 

    Path file1path = new Path(files[0]) 
    ... 
} 
2

我沒有用job.addCacheFile()。相反,我使用了像-files /path/to/myfile.txt#myfile這樣的-files選項。然後,在映射器或減速器代碼我使用下面的方法:

/** 
* This method can be used with local execution or HDFS execution. 
* 
* @param context 
* @param symLink 
* @param throwExceptionIfNotFound 
* @return 
* @throws IOException 
*/ 
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException 
{ 
    URI[] uris = context.getCacheFiles(); 
    if(uris==null||uris.length==0) 
    { 
     if(throwExceptionIfNotFound) 
      throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache"); 
     return null; 
    } 
    URI symlinkUri = null; 
    for(URI uri: uris) 
    { 
     if(symLink.equals(uri.getFragment())) 
     { 
      symlinkUri = uri; 
      break; 
     } 
    } 
    if(symlinkUri==null) 
    { 
     if(throwExceptionIfNotFound) 
      throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache"); 
     return null; 
    } 
    //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink 
    return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink); 

} 

然後在映射器/減速器:

@Override 
protected void setup(Context context) throws IOException, InterruptedException 
{ 
    super.setup(context); 

    File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true); 
    ... do work ... 
} 

注意,如果我使用的「-files /path/to/myfile.txt」直接然後我需要使用「myfile.txt」來訪問該文件,因爲這是默認的符號鏈接名稱。

1

沒有提到的解決方案爲我工作的完整性。這可能是因爲Hadoop版本不斷變化,我正在使用hadoop 2.6.4。本質上,DistributedCache已被棄用,所以我不想使用它。正如一些帖子建議我們使用addCacheFile(),但它已經改變了一點。下面是它是如何工作的

job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt")); 

這裏X.X.X.X可以是主IP地址或本地主機。 EnglishStop.txt存儲在HDFS的/位置。

hadoop fs -ls/

輸出是

-rw-r--r-- 3 centos supergroup  1833 2016-03-12 20:24 /EnglishStop.txt 
drwxr-xr-x - centos supergroup   0 2016-03-12 19:46 /test 

滑稽,但方便,現在#EnglishStop.txt意味着我們可以在映射器訪問它爲 「EnglishStop.txt」。這裏是代碼相同

public void setup(Context context) throws IOException, InterruptedException  
{ 
    File stopwordFile = new File("EnglishStop.txt"); 
    FileInputStream fis = new FileInputStream(stopwordFile); 
    BufferedReader reader = new BufferedReader(new InputStreamReader(fis)); 

    while ((stopWord = reader.readLine()) != null) { 
     // stopWord is a word read from Cache 
    } 
} 

這只是爲我工作。你可以讀取存儲在HDFS中的文件中的行

相關問題