2016-03-02 267 views

回答

10

我認爲僅將Spark想象爲一個數據處理工具是有幫助的,它具有開始加載數據的域。它可以讀取多種格式,並且支持Hadoop glob表達式,這對於從HDFS中的多個路徑讀取非常有用,但它沒有我知道的用於遍歷目錄或文件的內置工具,也沒有特定於與Hadoop或HDFS交互的實用程序。

有幾種可用的工具可以做你想做的,包括esutilhdfs。 hdfs lib支持CLI和API,你可以直接跳到'如何列出Python中的HDFS文件'here。它看起來像這樣:

from hdfs import Config 
client = Config().get_client('dev') 
files = client.list('the_dir_path') 
+0

嗨,你可以請指導我如何使hdfscli.cfg文件,我不知道要把它放在什麼端口號。 [全球] default.alias =開發 [dev.alias] URL = HTTP://dev.namenode:如果你想過濾結果端口 用戶=安 –

1

如果你想在目錄中的所有文件的讀取,檢查sc.wholeTextFiles[doc],但要注意的是,文件的內容讀入一個單行的價值,這可能不是理想的結果。

如果你想只讀一些文件,那麼生成的路徑列表(使用普通HDFS ls命令加任何過濾你需要),並把它傳遞到sqlContext.read.text[doc],然後從DataFrameRDD轉換恍如最好的方法。

14

使用JVM網關也許是不那麼優雅,但在某些情況下,下面的代碼可能會有所幫助:

URI   = sc._gateway.jvm.java.net.URI 
Path   = sc._gateway.jvm.org.apache.hadoop.fs.Path 
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem 
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration 


fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration()) 

status = fs.listStatus(Path('/some_dir/yet_another_one_dir/')) 

for fileStatus in status: 
    print fileStatus.getPath() 
+1

使用'globStatus',而不是'fileStatus' ,例如'status = fs.globStatus(Path('/ some_dir/yet_another_one_dir/*。csv'))' –

+0

這是相當不錯的,因爲它不需要我將其他庫上傳到spark-submit。 –

+0

謝謝你的回答。 – Tony

5

如果使用PySpark,您可以交互執行命令


列表從選定目錄中的所有文件:

hdfs dfs -ls <path>如:hdfs dfs -ls /user/path

import os 
import subprocess 

cmd = 'hdfs dfs -ls /user/path'.split() 
files = subprocess.check_output(cmd).strip().split('\n') 
for path in files: 
    print path 

或尋找文件在選定的目錄:

hdfs dfs -find <path> -name <expression>如:hdfs dfs -find /user/path -name *.txt

import os 
import subprocess 

cmd = 'hdfs dfs -find {} -name *.txt'.format(source_dir).split() 
files = subprocess.check_output(cmd).strip().split('\n') 
for path in files: 
    filename = path.split(os.path.sep)[-1].split('.txt')[0] 
    print path, filename 
+0

嗨不應該 files = subprocess.check_output(cmd_find).strip()。split('\ n') 是 files = subprocess.check_output(cmd).strip()。split('\ n') 我試過編輯,但是SO表示編輯必須大於6次更改。 –

0

有一個簡單的方法來做到這一點使用蛇咬傷文庫

from snakebite.client import Client 

hadoop_client = Client(HADOOP_HOST, HADOOP_PORT, use_trash=False) 

for x in hadoop_client.ls(['/']): 

...  print x