2017-04-26 77 views
0

我試圖應用mapparttion功能如下之後創建一個新的數據框(在SparkSQL 1.6.2):添加新列使用Java API和JavaRDD在星火SQL數據幀<Row>

FlatMapFunction<Iterator<Row>,Row> mapPartitonstoTTF=rows-> 
{ 
    List<Row> mappedRows=new ArrayList<Row>();  
    while(rows.hasNext()) 
    { 
     Row row=rows.next();    
     Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5), 
       row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),0L);  
     mappedRows.add(mappedRow); 

    } 
    return mappedRows; 

}; 


JavaRDD<Row> sensorDataDoubleRDD=oldsensorDataDoubleDF.toJavaRDD().mapPartitions(mapPartitonstoTTF); 

StructType oldSchema=oldsensorDataDoubleDF.schema(); 
StructType newSchema =oldSchema.add("TTF",DataTypes.LongType,false); 

System.out.println("The new schema is: "); 
newSchema.printTreeString(); 

System.out.println("The old schema is: "); 
oldSchema.printTreeString(); 

DataFrame sensorDataDoubleDF=hc.createDataFrame(sensorDataDoubleRDD, newSchema); 
sensorDataDoubleDF.show(); 

正如所看到的從上面我添加一個新的LongType列值爲0 RDDs使用RowFactory.create()函數

但是,我在行運行sensorDataDoubleDF.show();如下:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 117 in stage 26.0 failed 4 times, most recent failure: Lost task 117.3 in stage 26.0 (TID 3249, AUPER01-01-20-08-0.prod.vroc.com.au): scala.MatchError: 1435766400001 (of class java.lang.Long) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

舊的模式是

root 
|-- data_quality: double (nullable = false) 
|-- data_sensor: string (nullable = true) 
|-- data_timestamp: long (nullable = false) 
|-- data_valueDouble: double (nullable = false) 
|-- day: integer (nullable = false) 
|-- dpnode: string (nullable = true) 
|-- dsnode: string (nullable = true) 
|-- month: integer (nullable = false) 
|-- year: integer (nullable = false) 
|-- nodeid: string (nullable = true) 
|-- nodename: string (nullable = true) 

新的模式就像上面添加TTF列作爲LongType

root 
|-- data_quality: double (nullable = false) 
|-- data_sensor: string (nullable = true) 
|-- data_timestamp: long (nullable = false) 
|-- data_valueDouble: double (nullable = false) 
|-- day: integer (nullable = false) 
|-- dpnode: string (nullable = true) 
|-- dsnode: string (nullable = true) 
|-- month: integer (nullable = false) 
|-- year: integer (nullable = false) 
|-- nodeid: string (nullable = true) 
|-- nodename: string (nullable = true) 
|-- TTF: long (nullable = false) 

我明白任何幫助,算起來我們我在哪裏犯錯。

回答

1

您在舊架構中有11列,但您只映射了10個。在RowFactory.create函數中添加row.getString(10)函數。

Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5), 
       row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),row.getString(10),0L);