2016-03-17 69 views
3

我有一個從我遇到的特定異常派生的一般問題。如何避免火花NumberFormatException:null

我使用spark 1.6查詢dataproc的數據。我需要從2個日誌中獲取1天的數據(〜10000個文件),然後進行一些轉換。

但是,我的數據可能(或可能不會)有一些不良的數據 在一整天的查詢沒有成功後,我嘗試了00-09小時,沒有發生錯誤。嘗試了10-19小時,並得到一個例外。一小時一小時地嘗試,發現不好的數據是小時:10。小時圖11和12都很好

基本上我的代碼是:

val imps = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", "true").load("gs://logs.xxxx.com/2016/03/14/xxxxx/imps/2016-03-14-10*").select("C0","C18","C7","C9","C33","C29","C63").registerTempTable("imps") 

val conv = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", "true").load("gs://logs.xxxx.com/2016/03/14/xxxxx/conv/2016-03-14-10*").select("C0","C18","C7","C9","C33","C29","C65").registerTempTable("conversions") 

val ff = sqlContext.sql("select * from (select * from imps) A inner join (select * from conversions) B on A.C0=B.C0 and A.C7=B.C7 and A.C18=B.C18 ").coalesce(16).write.format("com.databricks.spark.csv").save("gs://xxxx-spark-results/newSparkResults/Plara2.6Mar14_10_1/") 

{過 - 簡化}

我得到的錯誤是:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 38 in stage 130.0 failed 4 times, most recent failure: Lost task 38.3 in stage 130.0 (TID 88495, plara26-0317-0001-sw-v8oc.c.xxxxx-analytics.internal): java.lang.NumberFormatException: null 
    at java.lang.Integer.parseInt(Integer.java:542) 
    at java.lang.Integer.parseInt(Integer.java:615) 
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) 
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) 
    at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:53) 
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:181) 
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:162) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

所以我的問題是 - 如何實現異常處理使用spark-csv? 我可以將數據幀轉換爲RDD並在其中工作,但似乎必須有更好的方法.....

任何人都解決了類似的問題?

+1

更新:在更改選項以將模式推斷爲false後,我已能夠獲取我的數據。這種方式字段被讀爲字符串,轉換爲Int當然是不必要的。 我仍然在尋找一個強大的解決方案來捕捉異常..... –

回答

0

這是因爲自動推斷模式對輸入文件中的無效數據不安全。

當使用不同的輸入文件時,這可能導致數據幀架構不同。

假設我們有一個花車csv文件,沾染的字符串:

0.018 
0.095 
0.000 
'hoi' 
0.000 
0.093 
0.012 

當我們讀入使用則InferSchema一個數據幀,像這樣的:

>>> df = spark.read.format('csv').option('inferSchema', True).load('./test_csv.dat') 
>>> df.show() 
+-----+ 
| _c0| 
+-----+ 
|0.018| 
|0.095| 
|0.000| 
|'hoi'| 
|0.000| 
|0.093| 
|0.012| 
+-----+ 

那麼該類型未正確推斷:

>>> df.schema 
StructType(List(StructField(_c0,StringType,true))) 

你c通過手動鑄造列解決此問題,如下所示:

>>> df = df.withColumn('val_float', df_tmp._c0.cast(FloatType())).select('val_float') 
>>> df.show() 
+---------+ 
|val_float| 
+---------+ 
| 0.018| 
| 0.095| 
|  0.0| 
|  null| 
|  0.0| 
| 0.093| 
| 0.012| 
+---------+ 

>>> df.schema 
StructType(List(StructField(val_float,FloatType,true)))