2017-04-24 65 views
0

我必須添加具有UUID值的新列。我已經使用Spark 1.4 Java使用以下代碼完成了此操作。SpakrSQL使用UUID生成新列

StructType objStructType = inputDataFrame.schema(); 
     StructField []arrStructField=objStructType.fields(); 
     List<StructField> fields = new ArrayList<StructField>(); 
     List<StructField> newfields = new ArrayList<StructField>(); 
     List <StructField> listFields = Arrays.asList(arrStructField); 
     StructField a = DataTypes.createStructField(leftCol,DataTypes.StringType, true); 
     fields.add(a); 
     newfields.addAll(listFields); 
     newfields.addAll(fields); 
     final int size = objStructType.size(); 

    JavaRDD<Row> rowRDD = inputDataFrame.javaRDD().map(new Function<Row, Row>() { 
     private static final long serialVersionUID = 3280804931696581264L; 
     public Row call(Row tblRow) throws Exception { 

       Object[] newRow = new Object[size+1]; 
       int rowSize= tblRow.length(); 
       for (int itr = 0; itr < rowSize; itr++) 
       { 
        if(tblRow.apply(itr)!=null) 
        { 
         newRow[itr] = tblRow.apply(itr); 
        } 

       } 
       newRow[size] = UUID.randomUUID().toString(); 
       return RowFactory.create(newRow); 

     } 
    }); 



    inputDataFrame = objsqlContext.createDataFrame(rowRDD, DataTypes.createStructType(newfields)); 

我想知道是否有一些乾淨的方式來做Spark 2.請指教。

回答

0

您可以註冊udf獲取UUID並使用callUDF函數爲您的inputDataFrame添加新列。請參閱使用Spark 2.0的示例代碼。

public class SparkUUIDSample { 
public static void main(String[] args) { 
    SparkSession spark = SparkSession.builder().appName("SparkUUIDSample").master("local[*]").getOrCreate(); 
    //sample input data 
    List<Tuple2<String, String>> inputList = new ArrayList<Tuple2<String, String>>(); 
    inputList.add(new Tuple2<String, String>("A", "v1")); 
    inputList.add(new Tuple2<String, String>("B", "v2")); 
    //dataset 
    Dataset<Row> df = spark.createDataset(inputList, Encoders.tuple(Encoders.STRING(), Encoders.STRING())).toDF("key", "value"); 
    df.show(); 
    //register udf 
    UDF1<String, String> uuid = str -> UUID.randomUUID().toString(); 
    spark.udf().register("uuid", uuid, DataTypes.StringType); 
    //call udf 
    df.select(col("*"), callUDF("uuid", col("value"))).show(); 
    //stop 
    spark.stop(); 
    }  
}