2012-07-05 54 views
15

我在裏面有兩個子文件夾每一個都有約30其子文件夾,最後,每一個包含XML文件HDFS文件夾目錄中的所有文件和子目錄。 我想列出只給出主文件夾路徑的所有xml文件。 本地我可以apache commons-io's FileUtils.listFiles做到這一點()。 我已經試過這如何列出在Hadoop的HDFS

FileStatus[] status = fs.listStatus(new Path(args[ 0 ])); 

但只列出了兩個第一子文件夾,並沒有走得更遠。 hadoop有沒有辦法做到這一點?

回答

14

你需要使用FileSystem對象,並在得到的時間filestatus對象執行一些邏輯來手動遞歸到子目錄。

您也可以應用PathFilter只返回使用listStatus(Path, PathFilter)方法

Hadoop的FsShell類有Hadoop的FS -lsr命令,這是一個遞歸最小二乘的這個例子使用XML文件 - 見the source,圍繞第590行(遞歸步驟在第635行觸發)

+0

最後我做了一個比你建議的更簡單的實現,但你給了我這個想法。謝謝! – nik686 2012-07-05 16:47:18

+3

斷開的參考鏈接 – AkD 2015-10-13 22:06:23

12

你有沒有嘗試過這樣的:

import java.io.*; 
import java.util.*; 
import java.net.*; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class cat{ 
    public static void main (String [] args) throws Exception{ 
     try{ 
      FileSystem fs = FileSystem.get(new Configuration()); 
      FileStatus[] status = fs.listStatus(new Path("hdfs://test.com:9000/user/test/in")); // you need to pass in your hdfs path 

      for (int i=0;i<status.length;i++){ 
       BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(status[i].getPath()))); 
       String line; 
       line=br.readLine(); 
       while (line != null){ 
        System.out.println(line); 
        line=br.readLine(); 
       } 
      } 
     }catch(Exception e){ 
      System.out.println("File not found"); 
     } 
    } 
} 
+0

是的,我已經看到了同樣的例子中,我已經提到了這個above.But它列出了深度1.I子目錄希望從主文件夾中獲取最終文件 – nik686 2012-07-05 10:37:44

1

這是一個代碼片段,用於計算特定HDFS目錄中的文件數(我用它來確定在特定的ETL代碼中使用多少個reducer)。您可以輕鬆修改以適應您的需求。

private int calculateNumberOfReducers(String input) throws IOException { 
    int numberOfReducers = 0; 
    Path inputPath = new Path(input); 
    FileSystem fs = inputPath.getFileSystem(getConf()); 
    FileStatus[] statuses = fs.globStatus(inputPath); 
    for(FileStatus status: statuses) { 
     if(status.isDirectory()) { 
      numberOfReducers += getNumberOfInputFiles(status, fs); 
     } else if(status.isFile()) { 
      numberOfReducers ++; 
     } 
    } 
    return numberOfReducers; 
} 

/** 
* Recursively determines number of input files in an HDFS directory 
* 
* @param status instance of FileStatus 
* @param fs instance of FileSystem 
* @return number of input files within particular HDFS directory 
* @throws IOException 
*/ 
private int getNumberOfInputFiles(FileStatus status, FileSystem fs) throws IOException { 
    int inputFileCount = 0; 
    if(status.isDirectory()) { 
     FileStatus[] files = fs.listStatus(status.getPath()); 
     for(FileStatus file: files) { 
      inputFileCount += getNumberOfInputFiles(file, fs); 
     } 
    } else { 
     inputFileCount ++; 
    } 

    return inputFileCount; 
} 
19

如果您正在使用Hadoop 2 * API有更優雅的解決方案:

Configuration conf = getConf(); 
    Job job = Job.getInstance(conf); 
    FileSystem fs = FileSystem.get(conf); 

    //the second boolean parameter here sets the recursion to true 
    RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
      new Path("path/to/lib"), true); 
    while(fileStatusListIterator.hasNext()){ 
     LocatedFileStatus fileStatus = fileStatusListIterator.next(); 
     //do stuff with the file like ... 
     job.addFileToClassPath(fileStatus.getPath()); 
    } 
+1

什麼是getConf()方法? – 2014-12-19 13:44:17

+0

'getConf()'是'Configured'類中的一個方法。你的班級最好能擴大這個範圍。 – 2014-12-30 10:16:35

6
/** 
* @param filePath 
* @param fs 
* @return list of absolute file path present in given path 
* @throws FileNotFoundException 
* @throws IOException 
*/ 
public static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException { 
    List<String> fileList = new ArrayList<String>(); 
    FileStatus[] fileStatus = fs.listStatus(filePath); 
    for (FileStatus fileStat : fileStatus) { 
     if (fileStat.isDirectory()) { 
      fileList.addAll(getAllFilePath(fileStat.getPath(), fs)); 
     } else { 
      fileList.add(fileStat.getPath().toString()); 
     } 
    } 
    return fileList; 
} 

簡單的例子:假設你有如下的文件結構:

a -> b 
    -> c -> d 
      -> e 
    -> d -> f 

使用上面的代碼,你會得到:

a/b 
a/c/d 
a/c/e 
a/d/f 

如果你想只有葉(即文件名),使用下面的代碼在else塊:

... 
    } else { 
     String fileName = fileStat.getPath().toString(); 
     fileList.add(fileName.substring(fileName.lastIndexOf("/") + 1)); 
    } 

這將給:

b 
d 
e 
f 
0

不使用遞歸的方式(堆問題):) 使用隊列

queue.add(param_dir) 
while (queue is not empty){ 

    directory= queue.pop 
- get items from current directory 
- if item is file add to a list (final list) 
- if item is directory => queue.push 
} 

這很簡單,享受!

0

由於拉杜阿德里安摩爾多瓦的意見。

下面是一個使用執行隊列:

private static List<String> listAllFilePath(Path hdfsFilePath, FileSystem fs) 
throws FileNotFoundException, IOException { 
    List<String> filePathList = new ArrayList<String>(); 
    Queue<Path> fileQueue = new LinkedList<Path>(); 
    fileQueue.add(hdfsFilePath); 
    while (!fileQueue.isEmpty()) { 
    Path filePath = fileQueue.remove(); 
    if (fs.isFile(filePath)) { 
     filePathList.add(filePath.toString()); 
    } else { 
     FileStatus[] fileStatus = fs.listStatus(filePath); 
     for (FileStatus fileStat : fileStatus) { 
     fileQueue.add(fileStat.getPath()); 
     } 
    } 
    } 
    return filePathList; 
} 
0

現在,人們可以使用星火做同樣和方式比其他方式更快(如Hadoop的MR)。這是代碼片段。兩個遞歸和非遞歸方法

def traverseDirectory(filePath:String,recursiveTraverse:Boolean,filePaths:ListBuffer[String]) { 
    val files = FileSystem.get(sparkContext.hadoopConfiguration).listStatus(new Path(filePath)) 
      files.foreach { fileStatus => { 
       if(!fileStatus.isDirectory() && fileStatus.getPath().getName().endsWith(".xml")) {     
        filePaths+=fileStatus.getPath().toString()  
       } 
       else if(fileStatus.isDirectory()) { 
        traverseDirectory(fileStatus.getPath().toString(), recursiveTraverse, filePaths) 
       } 
      } 
    } 
} 
0

代碼片段:

//helper method to get the list of files from the HDFS path 
public static List<String> 
    listFilesFromHDFSPath(Configuration hadoopConfiguration, 
          String hdfsPath, 
          boolean recursive) throws IOException, 
             IllegalArgumentException 
{ 
    //resulting list of files 
    List<String> filePaths = new ArrayList<String>(); 

    //get path from string and then the filesystem 
    Path path = new Path(hdfsPath); //throws IllegalArgumentException 
    FileSystem fs = path.getFileSystem(hadoopConfiguration); 

    //if recursive approach is requested 
    if(recursive) 
    { 
     //(heap issues with recursive approach) => using a queue 
     Queue<Path> fileQueue = new LinkedList<Path>(); 

     //add the obtained path to the queue 
     fileQueue.add(path); 

     //while the fileQueue is not empty 
     while (!fileQueue.isEmpty()) 
     { 
      //get the file path from queue 
      Path filePath = fileQueue.remove(); 

      //filePath refers to a file 
      if (fs.isFile(filePath)) 
      { 
       filePaths.add(filePath.toString()); 
      } 
      else //else filePath refers to a directory 
      { 
       //list paths in the directory and add to the queue 
       FileStatus[] fileStatuses = fs.listStatus(filePath); 
       for (FileStatus fileStatus : fileStatuses) 
       { 
        fileQueue.add(fileStatus.getPath()); 
       } // for 
      } // else 

     } // while 

    } // if 
    else  //non-recursive approach => no heap overhead 
    { 
     //if the given hdfsPath is actually directory 
     if(fs.isDirectory(path)) 
     { 
      FileStatus[] fileStatuses = fs.listStatus(path); 

      //loop all file statuses 
      for(FileStatus fileStatus : fileStatuses) 
      { 
       //if the given status is a file, then update the resulting list 
       if(fileStatus.isFile()) 
        filePaths.add(fileStatus.getPath().toString()); 
      } // for 
     } // if 
     else  //it is a file then 
     { 
      //return the one and only file path to the resulting list 
      filePaths.add(path.toString()); 
     } // else 

    } // else 

    //close filesystem; no more operations 
    fs.close(); 

    //return the resulting list 
    return filePaths; 
} // listFilesFromHDFSPath