2016-09-22 73 views
0

我想使用parquet格式將DStream保存到HDFS中。問題是我的case類使用joda.DateTime,而Spark SQL不支持這一點。例如:將case類的DStream轉換爲joda.DateTime轉換爲Spark DataFrame

case class Log (timestamp: DateTime, ...dozen of other fields here...) 

但我得到的錯誤:java.lang.UnsupportedOperationException:類型org.joda.time.DateTime模式試圖RDD轉換爲DF時,不支持:

def output(logdstream: DStream[Log]) { 
     logdstream.foreachRDD(elem => { 
      val df = elem.toDF() 
      df.saveAsParquet(...) 
     }); 
    } 

我模型很複雜並且有很多字段,所以我不想編寫不同的案例類來擺脫joda.DateTime。另一種選擇是直接從json保存到實木複合地板,但這並不理想。是否有一種簡單的方法可以將joda.DateTime自動轉換爲與spark一起使用的sql.Timestamp(轉換爲Spark的數據框)。

謝謝。

回答

0

這是一個有點冗長,但你一試映射登錄到SQL星火行:

logdstream.foreachRDD(rdd => { 
    rdd.map(log => Row(
    log.timestamp.toDate, 
    log.field2, 
    ... 
)).toDF().saveAsParquest(...) 
}) 
+0

嗨,不知道如果我理解正確。但是錯誤發生在語句中:val df = elem.toDF();換句話說,我無法使用.toDF()函數將RDD [Log]轉換爲數據框。你建議的解決方案似乎假定df已經可用? – auxdx

+0

你說得對,我錯過了。我改變了答案。 – bear911

相關問題