2014-10-07 57 views
2

我想使用listOfWords文件來只計算那些來自任何輸入文件的單詞。作爲FileNotFound獲取錯誤,即使我已經驗證該文件在HDFS中的適當位置。Hadoop分佈式緩存拋出FileNotFound錯誤

內部驅動器:

Configuration conf = new Configuration(); 
    DistributedCache.addCacheFile(new URI("/user/training/listOfWords"), conf); 
    Job job = new Job(conf,"CountEachWord Job"); 

裏面映射:

private Path[] ref_file; 
ArrayList<String> globalList = new ArrayList<String>(); 

public void setup(Context context) throws IOException{ 

    this.ref_file = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 

    FileSystem fs = FileSystem.get(context.getConfiguration()); 

    FSDataInputStream in_file = fs.open(ref_file[0]); 
    System.out.println("File opened"); 

    BufferedReader br = new BufferedReader(new InputStreamReader(in_file));//each line of reference file 
    System.out.println("BufferReader invoked"); 

    String eachLine = null; 
    while((eachLine = br.readLine()) != null) 
    { 
     System.out.println("eachLine is: "+ eachLine); 
     globalList.add(eachLine); 

    } 

} 

錯誤消息:

hadoop jar CountOnlyMatchWords.jar CountEachWordDriver Rhymes CountMatchWordsOut1 
Warning: $HADOOP_HOME is deprecated. 

14/10/07 22:28:59 WARN mapred.JobClient: Use GenericOptionsParser for parsing the  arguments.  Applications should implement Tool for the same. 
14/10/07 22:28:59 INFO input.FileInputFormat: Total input paths to process : 1 
14/10/07 22:28:59 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
14/10/07 22:28:59 WARN snappy.LoadSnappy: Snappy native library not loaded 
14/10/07 22:29:00 INFO mapred.JobClient: Running job: job_201409300531_0041 
14/10/07 22:29:01 INFO mapred.JobClient: map 0% reduce 0% 
14/10/07 22:29:14 INFO mapred.JobClient: Task Id : attempt_201409300531_0041_m_000000_0, Status : FAILED 
java.io.FileNotFoundException: File does not exist: /home/training/hadoop-temp/mapred/local /taskTracker/distcache/5910352135771601888_2043607380_1633197895/localhost/user/training/listOfWords 

我已經驗證了上述文件存在,HDFS。我也嘗試使用localRunner。仍然沒有工作。

+0

相反DistributedCache.addCacheFile的(新的URI(「/用戶工作/ training/listOfWords「),conf);試試這個DistributedCache.addCacheFile(new URI(「/ user/training/listOfWords」),job.getConfiguration()); – user3484461 2014-10-09 06:23:50

回答

0

你可以試試這個來檢索文件。

URI [] files = DistributedCache.getCacheFiles(context.getConfiguration());

您可以遍歷文件。

0

嘗試像this

在驅動

Configuration conf = new Configuration(); 
FileSystem fs = FileSystem.get(conf); 
Path cachefile = new Path("path/to/file"); 
FileStatus[] list = fs.globStatus(cachefile); 
for (FileStatus status : list) { 
DistributedCache.addCacheFile(status.getPath().toUri(), conf); 
} 

在映射器設置()

public void setup(Context context) throws IOException{ 
Configuration conf = context.getConfiguration(); 
FileSystem fs = FileSystem.get(conf); 
URI[] cacheFiles = DistributedCache.getCacheFiles(conf); 
Path getPath = new Path(cacheFiles[0].getPath()); 
BufferedReader bf = new BufferedReader(new InputStreamReader(fs.open(getPath))); 
String setupData = null; 
while ((setupData = bf.readLine()) != null) { 
    System.out.println("Setup Line in reducer "+setupData); 
} 
} 
+0

某些mr不會找到該文件 – yanghaogn 2017-05-09 04:30:48

0
try { 
     URI[] cacheFiles = DistributedCache.getCacheFiles(job); // Fetch the centroid file from distributed cache 
     Path getPath = new Path(cacheFiles[0].getPath()); 
     FileSystem fs = FileSystem.get(job); 
     if (cacheFiles != null && cacheFiles.length > 0) { 
      // Goes in if the file exist and is not empty 
      String line; 
      centers.clear(); // clearing the centers array list each time 
      BufferedReader cacheBufferReader = new BufferedReader(new InputStreamReader(fs.open(getPath))); 
      try { 
       while ((line = cacheBufferReader.readLine()) != null) { 
         centers.add(line); 
       } 
      } catch (IOException e) { 
       System.err.println("Exception: " + e); 
      } 
     } 
    } catch (IOException e) { 
     System.err.println("Exception: " + e); 
    } 
1

在main方法,我使用此。

Job job = Job.getInstance(); 
    job.setJarByClass(DistributedCacheExample.class); 
    job.setJobName("Distributed cache example"); 
    job.addCacheFile(new Path("/user/cloudera/datasets/abc.dat").toUri()); 

然後在Mapper中,我使用了這個樣板。

protected void setup(Context context) throws IOException, InterruptedException { 
    URI[] files = context.getCacheFiles(); 
    for(URI file : files){ 
    if(file.getPath().contains("abc.dat")){ 
     Path path = new Path(file); 
     BufferedReader reader = new BufferedReader(new FileReader(path.getName())); 
     String line = reader.readLine(); 
     while(line != null){ 
     ...... 
     } 
    } 
    } 

我與這些依賴我

<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-common</artifactId> 
    <version>2.7.3</version> 
    </dependency> 

    <dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-mapreduce-client-core</artifactId> 
    <version>2.7.3</version> 
    </dependency> 

招數部分是用path.getNameFileReader如果不是我弄FileNotFoundException