2015-02-24 64 views
1

我正在從Scro文件讀取數據的Scala中的Spark工作。 開始是很簡單:如何用Scala以優雅的方式處理Spark中的Avro

val path = "hdfs:///path/to/your/avro/folder" 
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path) 

但後來它不是優雅的,因爲我需要的元組,即操作。

avroRDD.map(x => (x.get("value").asInstanceOf[Long],x.get("start_time").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String])). 
map(x => (asDate(x._2),(x._1,x._3,x._4,x._5))). 
reduceByKey((x,y) => (x._1+y._1,x._2+y._2,x._3+y._3,y._4)). 
map(x => List(x._1,x._2._1,x._2._2,x._2._3,x._2._4).mkString(",")) 
... 

我想使用地圖,而不是元組,但如果我有幾個不同的類型,即長和絃,它就會導致Map[String,Any]澆鑄在每個操作。 即

avroRDD.map(x => Map("value" -> x.get("value").asInstanceOf[Long],"start_time" -> x.get("start_time").asInstanceOf[Long],"level" -> x.get("level").asInstanceOf[Double],"size" -> x.get("size").asInstanceOf[Long],"category" -> x.get("category").asInstanceOf[String])). 
map(x => (asDate(x.get("start_time).asInstanceOf[Long]),(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))). 
... 

替代解決方案是使用case類和包裝值到它,但有時它會導致很多case類即定義:

case class TestClass(value: Long, level:Double, size:Long, category:String) 

avroRDD.map(x => (x.get("start_time").asInstanceOf[Long],TestClass(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))). 
map(x => (asDate(x._1),x._2)). 
reduceByKey((x,y) => (x.value+y.value,x.level+y.level,x.size+y.size,y.category)). 
map(x => List(x._1,x._2.value,x._2.level,x._2.size,x._2.category).mkString(",")) 
... 

我想知道是否有更好的辦法在這種情況下處理通用記錄 - 您不需要不斷地轉換爲特定類型,並且可以對域名進行操作。像命名元組就可以完成這項工作。

你知道更好的方法嗎?

你如何處理這種情況?

+0

正在使用特定的記錄是否可以接受? – aaronman 2015-02-24 19:47:07

+0

不,我知道如果我將使用avro生成的類,我將有更簡單的第一部分,但仍然選擇值的子集仍然會是複雜的 - 後面的部分。 – baju 2015-02-24 19:48:41

+1

不知道如何使avro更好,但模式匹配可以清理深層嵌套的元組 – aaronman 2015-02-24 20:58:33

回答

2

隨着模式匹配:

map { case (value, startTime, level, size, category) => 
    (asDate(startTime), (value,level,size,category)) 
}.reduceByKey { case ((value1, level1, size1, category1), (value2, level2, size2, category2)) => 
    (value1+value2, level1+level2, size1+size2, category2) 
}.map { case (startTime, (value, level, size, category)) => 
    List(startTime, value, level, size, category).mkString(",")) 
} 

如果你有一些元組其中獲得經常重複使用,用例類他們。

+0

對於頻繁重用,您也可以提取本地函數/方法,清理一點 – 2015-02-25 13:58:03

相關問題