2016-07-27 60 views
1

我已經從Cloudant DB(IBM Bluemix Spark R Notebook)創建了一個DataFrame。我的數據幀結構:SparkR中的scala.MatchError(具有Spark SQL的DataFrame)

root 
|-- c1_x: double (nullable = true) 
|-- c2_y: double (nullable = true) 
|-- c3_z: double (nullable = true) 
|-- name: string (nullable = true) 
|-- c11_x: double (nullable = true) 
|-- c12_y: double (nullable = true) 
|-- c13_z: double (nullable = true) 
|-- l1: double (nullable = true) 
|-- l2: double (nullable = true) 
|-- c21_x: double (nullable = true) 
|-- c22_y: double (nullable = true) 
|-- c23_z: double (nullable = true) 
|-- pre: long (nullable = true) 
|-- timestamp: string (nullable = true) 

printSchema(payloadMessagesDF) 
showDF(payloadMessagesDF) # Getting error 
head(select(payloadMessagesDF, payloadMessagesDF$magnetometer_x)) # Getting error 

我看到了以下錯誤消息:

Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...): org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 229.0 failed 1 times, most recent failure: Lost task 0.0 in stage 229.0 (TID 1625, localhost): scala.MatchError: -39.099998474121094 (of class java.lang.Double) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2.apply(ExistingRDD.scala:59) 
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2.apply(ExistingRDD.scala:56) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
    at java.lang.Thread.run(Thread.java:785) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at java.lang.Thread.getStackTrace(Thread.java:1117) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1863) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) 
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) 
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) 
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) 
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) 
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) 
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) 
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) 
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) 
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) 
    at java.lang.reflect.Method.invoke(Method.java:507) 
    at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:141) 
    at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:86) 
    at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
+0

你在打什麼連接器? – brobes

回答

0

我假設你正在使用cloudant連接器「com.cloudant.spark」 我能夠重現你的錯誤,不完全,但我看到了像你一樣的typemismatch錯誤。 我創建了一個像你的雲數據庫。

{ 
    "_id": "242b9eda90bd1c66730c0313c20f1d4e", 
    "_rev": "1-8ed1b69d2440e98d8186928b61bf834d", 
    "c1_x": 10.678, 
    "c2_y": 20.678, 
    "c3_z": 20.678, 
    "name": "vim", 
    "c11_x": 20.678, 
    "c12_y": 20.678, 
    "c13_z": 20.678, 
    "l1": 20.678, 
    "l2": 20.678, 
    "c21_x": 20.678, 
    "c22_y": 20.678, 
    "c23_z": 20.678, 
    "pre": 2076677.6786767677, 
    "timestamp": "1419038000" 
} 

{ 
    "_id": "ac6570b8a20e6d5c94430593e600cbd1", 
    "_rev": "1-d78b053ae3383c9392e5f6c9377bb971", 
    "c1_x": null, 
    "c2_y": null, 
    "c3_z": null, 
    "name": null, 
    "c11_x": null, 
    "c12_y": null, 
    "c13_z": null, 
    "l1": null, 
    "l2": null, 
    "c21_x": null, 
    "c22_y": null, 
    "c23_z": null, 
    "pre": null, 
    "timestamp": null 
} 


{ 
    "_id": "78efe1ff33b8e67fe87dae43a5af516d", 
    "_rev": "1-26e1e3a502477d9710de1110acd49891", 
    "c1_x": "fsdf", 
    "c2_y": null, 
    "c3_z": null, 
    "name": null, 
    "c11_x": null, 
    "c12_y": null, 
    "c13_z": null, 
    "l1": null, 
    "l2": null, 
    "c21_x": null, 
    "c22_y": null, 
    "c23_z": null, 
    "pre": null, 
    "timestamp": null 
} 

僅當某些文檔中的值不正確時纔會出現錯誤。 例如。第三份文件有「c1_x」:「fsdf」。 我相信com.cloudant.spark驅動程序的方式或可能是SPARK SQL通過解釋第一個文檔的值來推斷模式。 然後大概它把它減少到一般類型。

所以我建議兩兩件事: - 在你的數據庫

  1. 修正值

  • 不要讓sparksql自動推斷架構而是爲您的模式提供字符串作爲您的數據類型,然後鍵入強制轉換爲double。
  • 我希望它有幫助。

    謝謝, Charles。

    +0

    你是對的@charles gomes。此屬性設置在使用spark-cloudant連接器時解析,「jsonstore.rdd.schemaSampleSize」=「 - 1」。非常感謝。 –