0
Hadoop的DistributedCache文檔似乎並未充分描述如何使用分佈式緩存。這裏給出的例子:將Hadoop DistributedCache與存檔結合使用
// Setting up the cache for the application
1. Copy the requisite files to the FileSystem:
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
2. Setup the application's JobConf:
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. Use the cached files in the Mapper
or Reducer:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {
private Path[] localArchives;
private Path[] localFiles;
public void configure(JobConf job) {
// Get the cached archives/files
File f = new File("./map.zip/some/file/in/zip.txt");
}
public void map(K key, V value,
OutputCollector<K, V> output, Reporter reporter)
throws IOException {
// Use data from the cached archives/files here
// ...
// ...
output.collect(k, v);
}
}
我一直在尋找一個小時試圖找出如何使用這個。拼湊幾個其他SO問題在一起後,這裏就是我想出了:
public static void main(String[] args) throws Exception {
Job job = new Job(new JobConf(), "Job Name");
JobConf conf = job.getConfiguration();
DistributedCache.createSymlink(conf);
DistributedCache.addCacheArchive(new URI("/ProjectDir/LookupTable.zip", job);
// *Rest of configuration code*
}
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>
{
private Path[] localArchives;
public void configure(JobConf job)
{
// Get the cached archive
File file1 = new File("./LookupTable.zip/file1.dat");
BufferedReader br1index = new BufferedReader(new FileInputStream(file1));
}
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{ // *Map code* }
}
- 我應該在哪裏調用
void configure(JobConf job)
功能? - 我在哪裏可以使用
private Path[] localArchives
對象? - 我的
configure()
函數中的代碼是否正確地訪問檔案中的文件並將文件鏈接到BufferedReader?
'''setup'和'configure'函數是否在Mapper類中? 「JobConf作業」將超出範圍。另外,我收到了'DistributedCache'和'URI'的未定義符號錯誤。任何想法我失蹤?感謝您所有的幫助! – LeonardBlunderbuss
安裝程序來自新API並在映射程序類中使用。我建議您在此處開始使用新API http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Mapper.html – rVr