2017-05-24 77 views

回答

3

對於Java 7,您需要定義一個地圖功能

public static final Function<Tuple2<String, String>,Row> mappingFunc = (tuple) -> { 
    return RowFactory.create(tuple._1(),tuple._2()); 
}; 

現在你可以調用這個函數來得到JavaRDD<Row>

JavaRDD<Row> rowRDD = filesRDD.map(mappingFunc); 

對於Java 8,它就像是

JavaRDD<Row> rowRDD = filesRDD.map(tuple -> RowFactory.create(tuple._1(),tuple._2())); 

另一種方式從JavaPairRDD獲得數據幀是

DataFrame df = sqlContext.createDataset(JavaPairRDD.toRDD(filesRDD), Encoders.tuple(Encoders.STRING(),Encoders.STRING())).toDF(); 
0

以下是您可以實現此目的的一種方法。

//Read whole files 
    JavaPairRDD<String, String> pairRDD = sparkContext.wholeTextFiles(path); 

    //create a structType for creating the dataframe later. You might want to 
    //do this in a different way if your schema is big/complicated. For the sake of this 
    //example I took a simple one. 
    StructType structType = DataTypes 
      .createStructType(
        new StructField[]{ 
          DataTypes.createStructField("id", DataTypes.StringType, true) 
          , DataTypes.createStructField("name", DataTypes.StringType, true)}); 


    //create an RDD<Row> from pairRDD 
    JavaRDD<Row> rowJavaRDD = pairRDD.values().flatMap(new FlatMapFunction<String, Row>() { 
     public Iterable<Row> call(String s) throws Exception { 
      List<Row> rows = new ArrayList<Row>(); 
      for (String line : s.split("\n")) { 
       String[] values = line.split(","); 
       Row row = RowFactory.create(values[0], values[1]); 
       rows.add(row); 
      } 
      return rows; 
     } 
    }); 


    //Create Dataframe. 
    sqlContext.createDataFrame(rowJavaRDD, structType); 

樣本數據我用
文件1:

1, john 
2, steve 

文件2:

3, Mike 
4, Mary 
從df.show輸出

():

+---+------+ 
| id| name| 
+---+------+ 
| 1| john| 
| 2| steve| 
| 3| Mike| 
| 4| Mary| 
+---+------+