2014-07-24 87 views
0

所以我對函數式編程以及Spark和Scala相當新穎,所以如果這很明顯,請原諒我......但基本上我有一個HDFS文件列表,一定的標準,即是這樣的:在Spark中構建一個在Scala中遞歸聯合的RDD

val List = (
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000140_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=03/000258_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=05/000270_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000297_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=30/000300_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000362_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=29/000365_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000397_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=15/000436_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=16/000447_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000529_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=17/000585_0") 

我現在需要建立一個RDD一起工作從這個名單...我的想法是使用遞歸聯盟......基本上函數是這樣的:

def dostuff(line: String): (org.apache.spark.rdd.RDD[String]) = { 
     val x = sc.textFile(line) 
     val x:org.apache.spark.rdd.RDD[String] = sc.textFile(x) ++ sc.textFile(line) 
} 

然後只需在地圖上應用它:

val RDD_list = List.map(l => l.dostuff) 

回答

3

可以讀取所有的文件到一個單一的RDD這樣的:

val sc = new SparkContext(...) 
sc.textFile("hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/*/*") 
    .map(line => ...) 
+0

輝煌!謝謝!應該想到的是......後續問題雖然......所以我現在有這樣的事情: –