2015-11-02 38 views
0

我想在下面的代碼中處理多個avro文件。這個想法是首先在列表中獲得一系列avro文件。然後打開每個avro文件並生成元組(String,int)。然後最後通過密鑰對元組流進行分組,然後求和整數。在返回RDD的函數上做flatmap

object AvroCopyUtil { 
    def main(args: Array[String]) : Unit = { 

    val conf = new SparkConf().setAppName("Leads Data Analysis").setMaster("local[*]") 
    val sc = new SparkContext(conf) 

    val fs = FileSystem.get(new Configuration()) 
    val avroList = GetAvroList(fs, args(0)) 
    avroList.flatMap(av => 
     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](av) 
     .map(r => (r._1.datum.get("field").toString, 1))) 
     .reduceByKey(_ + _) 
     .foreach(println) 
    } 


    def GetAvroList(fs: FileSystem, input: String) : List[String] = { 
    // get all children 
    val masterList : List[FileStatus] = fs.listStatus(new Path(input)).toList 
    val (allFiles, allDirs) = masterList.partition(x => x.isDirectory == false) 
    allFiles.map(_.getPath.toString) ::: allDirs.map(_.getPath.toString).flatMap(x => GetAvroList(fs, x)) 
    } 
} 

編譯錯誤我得到的是

[error] found : org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] 
[error] required: TraversableOnce[?] 
[error]  avroRdd.flatMap(av => sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](av)) 
[error]                              ^
[error] one error found 

編輯:基於下面我試圖

val rdd = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](avroList.mkString(",")) 

,但我得到了錯誤

Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Illegal character in scheme name at index 0: 2015-10- 
15-00-1576041136-flumetracker.foo.com-FooAvroEvent.1444867200044.avro,hdfs: 

回答

1

你的功能建議是不必要的。你也試圖在一個沒有意義的轉換中創建一個RDD。轉換(在這種情況下,flatMap)在RDD之上運行,並且RDD中的記錄將被轉換。在flatMap的情況下,匿名函數的預期輸出是一個TraversableOnce對象,然後該對象將通過轉換被平鋪爲多個記錄。仔細看看你的代碼,你並不需要做一個flatMap,因爲只需要map即可。請記住,由於RDD的不變性,您必須始終將您的轉換重新分配爲新的值。

嘗試類似:

val avroRDD = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](filePath) 
val countsRDD = avroRDD.map(av => (av._1.datum.get("field1").toString, 1)).reduceByKey(_ + _) 

它好像你可能需要花一些時間來掌握一些星火的基本框架的細微差別。我會建議您完全閱讀Spark Programming Guide。最後,如果您想使用Avro,請同時查看spark-avro,與Avro一起工作的鍋爐板很多都在那裏照顧(DataFrame可能更直觀,更易於使用)。

(編輯:)

好像你可能誤會如何加載Spark中要處理的數據。 parallelize()方法用於在RDD中分發集合,而不是在文件中分發數據。要實現後者,實際上只需要將輸入文件的逗號分隔列表提供給newAPIHadoopFile()加載器。所以假設你的GetAvroList()函數可以工作,你可以這樣做:

val avroList = GetAvroList(fs, args(0)) 
val avroRDD = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](avroList.mkString(",")) 
val countsRDD = avroRDD.map(av => (av._1.datum.get("field1").toString, 1)).reduceByKey(_ + _) 
flatMappedRDD.foreach(println) 
+0

實際上,我正在處理多個avro文件。讓我發佈完整的代碼,使其有意義。 –

+0

每個avro文件生成一個RDD [(String,Int)]該平面圖用於從所有avro文件中獲得單個RDD [(String,Int)]。 –

+1

@KnowsNotMuch更新,讓我知道,如果這有幫助! –