我正在從Scro文件讀取數據的Scala中的Spark工作。 開始是很簡單:如何用Scala以優雅的方式處理Spark中的Avro
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
但後來它不是優雅的,因爲我需要的元組,即操作。
avroRDD.map(x => (x.get("value").asInstanceOf[Long],x.get("start_time").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String])).
map(x => (asDate(x._2),(x._1,x._3,x._4,x._5))).
reduceByKey((x,y) => (x._1+y._1,x._2+y._2,x._3+y._3,y._4)).
map(x => List(x._1,x._2._1,x._2._2,x._2._3,x._2._4).mkString(","))
...
我想使用地圖,而不是元組,但如果我有幾個不同的類型,即長和絃,它就會導致Map[String,Any]
澆鑄在每個操作。 即
avroRDD.map(x => Map("value" -> x.get("value").asInstanceOf[Long],"start_time" -> x.get("start_time").asInstanceOf[Long],"level" -> x.get("level").asInstanceOf[Double],"size" -> x.get("size").asInstanceOf[Long],"category" -> x.get("category").asInstanceOf[String])).
map(x => (asDate(x.get("start_time).asInstanceOf[Long]),(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
...
替代解決方案是使用case類和包裝值到它,但有時它會導致很多case類即定義:
case class TestClass(value: Long, level:Double, size:Long, category:String)
avroRDD.map(x => (x.get("start_time").asInstanceOf[Long],TestClass(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
map(x => (asDate(x._1),x._2)).
reduceByKey((x,y) => (x.value+y.value,x.level+y.level,x.size+y.size,y.category)).
map(x => List(x._1,x._2.value,x._2.level,x._2.size,x._2.category).mkString(","))
...
我想知道是否有更好的辦法在這種情況下處理通用記錄 - 您不需要不斷地轉換爲特定類型,並且可以對域名進行操作。像命名元組就可以完成這項工作。
你知道更好的方法嗎?
你如何處理這種情況?
正在使用特定的記錄是否可以接受? – aaronman 2015-02-24 19:47:07
不,我知道如果我將使用avro生成的類,我將有更簡單的第一部分,但仍然選擇值的子集仍然會是複雜的 - 後面的部分。 – baju 2015-02-24 19:48:41
不知道如何使avro更好,但模式匹配可以清理深層嵌套的元組 – aaronman 2015-02-24 20:58:33