2014-10-19 29 views
0

我正在使用Spark 1.1。 我有一個Spark作業,只在一個存儲桶(即以......開頭的文件夾)中尋找某個特定模式的文件夾,並且應該只處理這些文件夾。使用globStatus和Google Cloud Storage存儲桶作爲輸入時無法運行Spark作業

FileSystem fs = FileSystem.get(new Configuration(true)); 
FileStatus[] statusArr = fs.globStatus(new Path(inputPath)); 
List<FileStatus> statusList = Arrays.asList(statusArr); 

List<String> pathsStr = convertFileStatusToPath(statusList); 

JavaRDD<String> paths = sc.parallelize(pathsStr); 

但是,運行谷歌雲存儲路徑上此作業時:GS:我通過以下操作實現這一// rsync的-1/2014_07_31 *(採用最新的谷歌雲存儲連接器1.2.9) ,我得到以下錯誤:

4/10/13 10:28:38 INFO slf4j.Slf4jLogger: Slf4jLogger started  
14/10/13 10:28:38 INFO util.Utils: Successfully started service 'Driver' on port 60379.  
14/10/13 10:28:38 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://[email protected]:45212/user/Worker  
Exception in thread "main" java.lang.reflect.InvocationTargetException  
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)  
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  
    at java.lang.reflect.Method.invoke(Method.java:606)  
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)  
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)  
Caused by: java.lang.IllegalArgumentException: Wrong bucket: rsync-1, in path: gs://rsync-1/2014_07_31*, expected bucket: hadoop-config  
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:100)  
    at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:294)  
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:457)  
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:163)  
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1052)  
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1027)  
    at com.doit.customer.dataconverter.Phase0.main(Phase0.java:578)  
... 6 more 

當我在本地文件夾上運行此作業時,一切正常。

Hadoop的配置是一個桶我使用部署在谷歌Compute Engine的星火集羣(使用bdutil 0.35.2工具)

回答

4

簡答

而不是使用:

FileSystem fs = FileSystem.get(new Configuration(true)); 
    FileStatus[] statusArr = fs.globStatus(new Path(inputPath)); 
    List<FileStatus> statusList = Arrays.asList(statusArr); 

你需要做的

Path inputPathObj = new Path(inputPath); 
    FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true)); 
    FileStatus[] statusArr = fs.globStatus(inputPathObj); 
    List<FileStatus> statusList = Arrays.asList(statusArr); 

由於在Hadoop中,FileSystem實例是基於URI的schemeauthority組件(以及更高級設置中潛在的用戶組信息)共享的,並且這些實例在方案和權限之間不可互換。

龍回答

這具有與URIhostnamepath部件之間的區別做在[方案]:// [權威]/[路徑],這可能是更明顯在HDFS用例中,但也適用於GCS。基本上,有幾個get方法org.apache.hadoop.fs.FileSystem,這裏最適用的是:

public static FileSystem get(Configuration conf) 

public static FileSystem get(URI uri, Configuration conf) 

前者實際上只是調用了後者:

return get(getDefaultUri(conf), conf); 

其中getDefaultUri(conf)fs.default.namefs.defaultFS定義。第二個考慮因素是具有不同hosthnameauthority組件的FileSystems被認爲是固有不同的文件系統;在HDFS情況下,這是有意義的,因爲:

FileSystem.get("hdfs://foo-cluster-namenode/", conf); 
    FileSystem.get("hdfs://bar-cluster-namenode/", conf); 

每個點可能完全不同的文件系統的情況下,在不同的簇,從而能夠在兩個單獨的HDFS實例使用的相同的路徑名指的是分離存儲的命名空間。雖然GDC中的bucket在機器的「主機名稱」方面透明度較低,但它的確扮演着GCE URI的authority組件 - 在Hadoop中,這意味着當bucket相同時,FileSystem.get字面上返回相同的緩存Java文件系統對象,但不同桶的不同實例。正如你不能創建一個HDFS實例,並在不同的授權點吧:

// Can't mix authorities! 
    FileSystem.get("hdfs://foo/", conf).listStatus(new Path("hdfs://bar/")); 

當你叫FileSystem.get(conf)你有效地得到了一個緩存實例指着gs://hadoop-config/,然後用這來嘗試列出gs://rsync-1

相反,當時你知道你想要操作的路徑,這應該是你取一個文件系統實例的時間:

FileSystem fs = FileSystem.get(myPath.toUri(), new Configuration(true)); 
    fs.globStatus(myPath); 
相關問題