2017-07-06 34 views
0

我寫具有以下緩衝模式的UDAF元素:星火不能從「地圖類型」

bufferSchema: StructType = StructType(
    StructField("grades", MapType(StructType(StructField("subject", StringType) :: StructField("subject_type", StringType) :: Nil), 
     ArrayType(StructType(StructField("date", LongType) :: StructField("grade", IntegerType) :: Nil)))) :: Nil) 

它看起來像內部火花解釋主要類型爲GenericRowWithSchema,而不是簡單(字符串,串)。 所以每當我試着從地圖拉:

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 

var buffer_scoresMap = buffer.getAs[Map[(String,String), Array[..]](0) 

buffer_scoresMap.get(("k1","k2"))返回None儘管這一關鍵肯定是在地圖上,我甚至看到它在調試。 我試圖將密鑰變異爲GenericRowWithSchema,然後回到(String,String),然後從地圖上獲取,但沒有運氣。

任何想法?

回答

1

確實,元組被轉換爲Structs,並且當它們是深嵌套列的一部分時,它們不會轉換回元組。換句話說,buffer_scoresMap卻有類型Map[Row, Array[..]],這樣你就可以創建一個從中獲取項目:

var buffer_scoresMap = buffer.getAs[Map[Row, Array[..]](0) 
buffer_scoresMap.get(Row("k1","k2")) // should not be None if key exists 

下面是證明了這一點很短的例子:

// create a simple DF with similar schema: 
case class Record(grades: Map[(String, String), Array[Int]]) 
val df = sc.parallelize(Seq(Record(Map(("a", "b") -> Array(1, 2))))).toDF("grades") 

// this indeed fails: 
df.rdd.map(r => r.getAs[Map[(String, String), Array[Int]]](0).get(("a", "b"))).first() // None 

// but this works: 
df.rdd.map(r => r.getAs[Map[Row, Array[Int]]](0).get(Row("a", "b"))).first() // Some(WrappedArray(1, 2))