2016-10-24 83 views
0

我沒有的一些數據附帶了一個應該是timestamp的字段,但有時似乎不符合ISO 8601標準。Spark SQL:無秒解析時間戳

在我的代碼,我定義了一個架構,然後在星火SQL解析的json數據,我得到以下錯誤:

java.lang.IllegalArgumentException: 2016-10-07T11:15Z 

源數據有以下幾點:

"transaction_date_time": "2016-10-07T11:15Z" 

而且我的模式被定義爲:

我相信這是由於它缺少秒。我怎麼才能正確解析時間戳?

編輯: 例如,使用

spark.read.schema(schema).json(rdd).show() 

會觸發以下錯誤

16/10/24 13:06:27 ERROR Executor: Exception in task 6.0 in stage 5.0 (TID 23) 
java.lang.IllegalArgumentException: 2016-10-07T11:15Z 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source) 
    at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422) 
    at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417) 
    at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327) 
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285) 
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/10/24 13:06:27 WARN TaskSetManager: Lost task 6.0 in stage 5.0 (TID 23, localhost): java.lang.IllegalArgumentException: 2016-10-07T11:15Z 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source) 
    at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422) 
    at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417) 
    at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327) 
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285) 
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

16/10/24 13:06:27 ERROR TaskSetManager: Task 6 in stage 5.0 failed 1 times; aborting job 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 5.0 failed 1 times, most recent failure: Lost task 6.0 in stage 5.0 (TID 23, localhost): java.lang.IllegalArgumentException: 2016-10-07T11:15Z 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source) 
    at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422) 
    at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417) 
    at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327) 
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285) 
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1924) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2139) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 
    ... 54 elided 
Caused by: java.lang.IllegalArgumentException: 2016-10-07T11:15Z 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source) 
    at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source) 
    at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422) 
    at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417) 
    at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327) 
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285) 
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285) 
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

回答

0

它看起來像apache.spark.Timestamp讀取數據僅僅是java.sql.Timestamp的包裝。至少這就是this讓我相信。

因此,我們可以使用SimpleDateFormat解析日期並提取毫秒,並將其傳遞給Timestamp構造函數。

你可以做這樣的事情在這個例子中對數據進行預處理:

import java.sql.Timestamp; 
import java.text.*; 
import java.util.Date; 

public class Test { 
    public static void main(String[] args) { 
     String timestamp = "2016-10-07T11:15Z"; 
     DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mmXXX"); 
     Date parsedDate = null; 
     try{ 
       parsedDate = df.parse(timestamp); 
     }catch(Exception e){ 
       //do nothing 
     } 
     Timestamp ts = new Timestamp(parsedDate.getTime()); 
     System.out.println(parsedDate); 
     System.out.println(ts); 
    } 
} 

Fri Oct 07 04:15:00 PDT 2016 
2016-10-07 04:15:00.0 

我搜索了一下關於「日期格式可選部分」找到this SO說,其輸出你應該只做兩個SimpleDateFormat s。

+0

我認爲解析這個日期並不困難,即使使用Java 8 Time API,它也可以直接使用。問題出在Spark我猜,挑戰在於在上面的代碼旁邊使用你的代碼(我只是編輯來更好地解釋) – Stephane

+0

我在文檔中四處查找並找到[編碼器特徵](http:// spark。 apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Encoders$)。我想你需要定義一個客戶編碼器,並用它代替TimeStamp。 [這是另一個看起來更有用的鏈接](https:// jaceklaskowski。gitbooks.io/mastering-apache-spark/content/spark-sql-Encoder.html)。仍在徘徊,但認爲我會張貼我發現的 – tenCupMaximum

+0

看起來像如果你只是擴展[dataType](https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/sql /types/DataType.html),你可以將它傳遞給你已經在[StructType]中使用的相同'add'函數(https://spark.apache.org/docs/2.0.0/api/java/org/阿帕奇/火花/ SQL /類型/ StructType.html) – tenCupMaximum

2

您可以更改

val schema = (new StructType) 
     .add("transaction_date_time", TimestampType) 

TO

val schema = (new StructType) 
     .add("transaction_date_time", StringType) 

然後用df.withColumn("columnTimeWithOutSec", unix_timestamp($"time", format))

where format = "format time with out seconds ex HH:mm " 

就像this...

另外,有A L ook DateTimeUtils.scala與Date和TimeStamp的Spark樣式轉換內聯。