我想在下面的代碼中處理多個avro文件。這個想法是首先在列表中獲得一系列avro文件。然後打開每個avro文件並生成元組(String,int)。然後最後通過密鑰對元組流進行分組,然後求和整數。在返回RDD的函數上做flatmap
object AvroCopyUtil {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf().setAppName("Leads Data Analysis").setMaster("local[*]")
val sc = new SparkContext(conf)
val fs = FileSystem.get(new Configuration())
val avroList = GetAvroList(fs, args(0))
avroList.flatMap(av =>
sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](av)
.map(r => (r._1.datum.get("field").toString, 1)))
.reduceByKey(_ + _)
.foreach(println)
}
def GetAvroList(fs: FileSystem, input: String) : List[String] = {
// get all children
val masterList : List[FileStatus] = fs.listStatus(new Path(input)).toList
val (allFiles, allDirs) = masterList.partition(x => x.isDirectory == false)
allFiles.map(_.getPath.toString) ::: allDirs.map(_.getPath.toString).flatMap(x => GetAvroList(fs, x))
}
}
編譯錯誤我得到的是
[error] found : org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)]
[error] required: TraversableOnce[?]
[error] avroRdd.flatMap(av => sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](av))
[error] ^
[error] one error found
編輯:基於下面我試圖
val rdd = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](avroList.mkString(","))
,但我得到了錯誤
Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Illegal character in scheme name at index 0: 2015-10-
15-00-1576041136-flumetracker.foo.com-FooAvroEvent.1444867200044.avro,hdfs:
實際上,我正在處理多個avro文件。讓我發佈完整的代碼,使其有意義。 –
每個avro文件生成一個RDD [(String,Int)]該平面圖用於從所有avro文件中獲得單個RDD [(String,Int)]。 –
@KnowsNotMuch更新,讓我知道,如果這有幫助! –