我使用的Spark 1.6和Java 7轉換JavaPairRDD到數據幀中的星火的Java API
我有一雙RDD:
JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(args[0]);
我想它的模式轉換成DataFrame
。
看來,首先我必須將pairRDD轉換爲RowRDD。
那麼如何從PairRDD創建RowRdd?
我使用的Spark 1.6和Java 7轉換JavaPairRDD到數據幀中的星火的Java API
我有一雙RDD:
JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(args[0]);
我想它的模式轉換成DataFrame
。
看來,首先我必須將pairRDD轉換爲RowRDD。
那麼如何從PairRDD創建RowRdd?
對於Java 7,您需要定義一個地圖功能
public static final Function<Tuple2<String, String>,Row> mappingFunc = (tuple) -> {
return RowFactory.create(tuple._1(),tuple._2());
};
現在你可以調用這個函數來得到JavaRDD<Row>
JavaRDD<Row> rowRDD = filesRDD.map(mappingFunc);
對於Java 8,它就像是
JavaRDD<Row> rowRDD = filesRDD.map(tuple -> RowFactory.create(tuple._1(),tuple._2()));
另一種方式從JavaPairRDD獲得數據幀是
DataFrame df = sqlContext.createDataset(JavaPairRDD.toRDD(filesRDD), Encoders.tuple(Encoders.STRING(),Encoders.STRING())).toDF();
以下是您可以實現此目的的一種方法。
//Read whole files
JavaPairRDD<String, String> pairRDD = sparkContext.wholeTextFiles(path);
//create a structType for creating the dataframe later. You might want to
//do this in a different way if your schema is big/complicated. For the sake of this
//example I took a simple one.
StructType structType = DataTypes
.createStructType(
new StructField[]{
DataTypes.createStructField("id", DataTypes.StringType, true)
, DataTypes.createStructField("name", DataTypes.StringType, true)});
//create an RDD<Row> from pairRDD
JavaRDD<Row> rowJavaRDD = pairRDD.values().flatMap(new FlatMapFunction<String, Row>() {
public Iterable<Row> call(String s) throws Exception {
List<Row> rows = new ArrayList<Row>();
for (String line : s.split("\n")) {
String[] values = line.split(",");
Row row = RowFactory.create(values[0], values[1]);
rows.add(row);
}
return rows;
}
});
//Create Dataframe.
sqlContext.createDataFrame(rowJavaRDD, structType);
樣本數據我用
文件1:
1, john
2, steve
文件2:
3, Mike
4, Mary
從df.show輸出
():
+---+------+
| id| name|
+---+------+
| 1| john|
| 2| steve|
| 3| Mike|
| 4| Mary|
+---+------+