2015-04-24 59 views
15

我有一個CSV字段,其中字段是特定格式的日期時間。我不能直接在我的Dataframe中導入它,因爲它需要是一個時間戳。所以我將其導入爲字符串,並將其轉換成Timestamp這樣在Spark中將字符串字段轉換爲時間戳的更好方法

import java.sql.Timestamp 
import java.text.SimpleDateFormat 
import java.util.Date 
import org.apache.spark.sql.Row 

def getTimestamp(x:Any) : Timestamp = { 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") 
    if (x.toString() == "") 
    return null 
    else { 
     val d = format.parse(x.toString()); 
     val t = new Timestamp(d.getTime()); 
     return t 
    } 
} 

def convert(row : Row) : Row = { 
    val d1 = getTimestamp(row(3)) 
    return Row(row(0),row(1),row(2),d1) 
} 

是否還有更好的,更簡潔的方式來做到這一點,與數據幀API或火花-SQL?上述方法需要創建RDD並再次爲Dataframe提供模式。

回答

6

我還沒有星火SQL打了,但是我認爲這將是更地道斯卡拉(空的使用不被認爲是一個很好的做法):

def getTimestamp(s: String) : Option[Timestamp] = s match { 
    case "" => None 
    case _ => { 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") 
    Try(new Timestamp(format.parse(s).getTime)) match { 
     case Success(t) => Some(t) 
     case Failure(_) => None 
    }  
    } 
} 

請注意我假定你知道元素類型事先(如果你從csv文件讀取它們,它們都是String),這就是爲什麼我使用String而不是Any(所有東西都是Any的子類型)的適當類型。

這也取決於你想如何處理解析異常。在這種情況下,如果發生解析異常,則簡單地返回None

你可能會進一步上使用它:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3)) 
+0

我以前做過這個。我感覺我應該解決核心問題,然後再轉向這種細微之處。如果有更好的解決方案,可能不必這樣做。問題是關於rows.map,它返回rdd並需要轉換爲ddf。所以可能是ddf API缺乏,或者我不知道如何去做。 – user568109

+0

我不知道是否有其他方法,但是您可以將任何RDD轉換爲DF,而不會出現任何問題。在'sqlContext.createDataFrame(rowRDD,schema)'的具體例子中。對我來說,sql很好地以類似SQL的方式查詢數據,而不是解析數據本身(對於這樣的事情,使用簡單的RDD)。 – jarandaf

+0

嘗試(新時間戳(format.parse(s).getTime))。toOption – nont

1

我想移動getTimeStamp方法寫的,你到RDD的mapPartitions和重用GenericMutableRow行之間在一個迭代:

val strRdd = sc.textFile("hdfs://path/to/cvs-file") 
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter => 
    new Iterator[Row] { 
    val row = new GenericMutableRow(4) 
    var current: Array[String] = _ 

    def hasNext = iter.hasNext 
    def next() = { 
     current = iter.next() 
     row(0) = current(0) 
     row(1) = current(1) 
     row(2) = current(2) 

     val ts = getTimestamp(current(3)) 
     if(ts != null) { 
     row.update(3, ts) 
     } else { 
     row.setNullAt(3) 
     } 
     row 
    } 
    } 
} 

而且您仍然應該使用架構來生成DataFrame

val df = sqlContext.createDataFrame(rowRdd, tableSchema) 

GenericMutableRow的迭代器實現內部的使用可以找到Aggregate OperatorInMemoryColumnarTableScanParquetTableOperations

+0

它非常接近我的實際代碼。另外如果你想分析csv文件,你應該使用spark-csv而不是split。我想要做的就是添加和變異列將返回一個rdd,它將再次需要通過提供架構轉換爲ddf。有一條較短的路線。 – user568109

+0

@ user568109,我不認爲有一個。由於spark-sql需要一個模式,它必須以某種方式得到一個模式。如果您使用RDD [CaseClassX],則spark-sql將根據案例類的定義自動爲您推理模式。但你在這裏使用的是一個Row(Array [Any]),沒有DataType推斷可以去那裏,所以你只需要傳遞一個。 –

+0

我認爲,使用一個參考,每次突變並將其作爲參考返回是一個災難的祕訣。你真的成功地使用了這種方法嗎? – maasg

1

我有我的數據集ISO8601時間戳,我需要爲「YYYY-MM-DD」格式轉換。這是我做的:

import org.joda.time.{DateTime, DateTimeZone} 
object DateUtils extends Serializable { 
    def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC) 
    def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC) 
} 

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd")) 

而你可以在你的spark SQL查詢中使用UDF。

31

火花> = 2.2

import org.apache.spark.sql.functions.to_timestamp 

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") 
df.withColumn("ts", ts).show(2, false) 

df.withColumn("ts", ts).show(2, false) 

// +---+-------------------+-------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+-------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01| 
// |2 |#[email protected]#@#    |null    | 
// +---+-------------------+-------------------+ 

火花> = 1.6,2.2 <

可以使用已在火花1.5被引入日期處理功能。假設你有以下數據:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#[email protected]#@#")).toDF("id", "dts") 

您可以使用unix_timestamp解析字符串,並投它爲timestamp

import org.apache.spark.sql.functions.unix_timestamp 

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp") 

df.withColumn("ts", ts).show(2, false) 

// +---+-------------------+---------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+---------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0| 
// |2 |#[email protected]#@#    |null     | 
// +---+-------------------+---------------------+ 

正如你可以看到它涵蓋了解析和錯誤處理。

Spark> = 1。5,< 1.6

你將不得不使用使用這樣的事情:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp") 

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp") 

由於SPARK-11724

星火< 1.5

你應該能夠與exprHiveContext使用這些。

0

我會用https://github.com/databricks/spark-csv

這將推斷時間戳爲您服務。

import com.databricks.spark.csv._ 
val rdd: RDD[String] = sc.textFile("csvfile.csv") 

val df : DataFrame = new CsvParser().withDelimiter('|') 
     .withInferSchema(true) 
     .withParseMode("DROPMALFORMED") 
     .csvRdd(sqlContext, rdd) 
相關問題