0
我試圖過濾DataFrame內容,使用Spark的1.5方法dropDuplicates()。 使用它與完全數據填充表(我的意思是沒有空單元)給出正確的結果,但是當我的CSV源包含空單元格(我會提供給你的源文件) - Spark throw ArrayIndexOutOfBoundsException。 我在做什麼錯?我已閱讀1.6.2版的Spark SQL和DataFrames教程,並未詳細介紹DataFrame操作。我也在閱讀「Learning Spark.Greenning-Fast大數據分析」一書,但是它是爲Spark 1.5編寫的,我不需要在那裏描述我需要的操作。我會很高興得到解釋或鏈接到手冊。 謝謝。Apache Spark SQL上下文dropDuplicates
package data;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
public class TestDrop {
public static void main(String[] args) {
DropData dropData = new DropData("src/main/resources/distinct-test.csv");
dropData.execute();
}
}
class DropData{
private String csvPath;
private JavaSparkContext sparkContext;
private SQLContext sqlContext;
DropData(String csvPath) {
this.csvPath = csvPath;
}
void execute(){
initContext();
DataFrame dataFrame = loadDataFrame();
dataFrame.show();
dataFrame.dropDuplicates(new String[]{"surname"}).show();
//this one fails too: dataFrame.drop("surname")
}
private void initContext() {
sparkContext = new JavaSparkContext(new SparkConf().setMaster("local[4]").setAppName("Drop test"));
sqlContext = new SQLContext(sparkContext);
}
private DataFrame loadDataFrame() {
JavaRDD<String> strings = sparkContext.textFile(csvPath);
JavaRDD<Row> rows = strings.map(string -> {
String[] cols = string.split(",");
return RowFactory.create(cols);
});
StructType st = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("surname", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.StringType, true),
DataTypes.createStructField("sex", DataTypes.StringType, true),
DataTypes.createStructField("socialId", DataTypes.StringType, true)));
return sqlContext.createDataFrame(rows, st);
}
}
那你還期望什麼?你聲明瞭一些字段,如果不匹配,你會得到異常。這是預期的行爲。只是過濾出格式不正確的數據。 – zero323
你是什麼意思?我有「姓氏」欄。根據JavaDoc中的這一列,我期望Spark能夠篩選重複的行。 順便說一下,這是我的[csv文件](http://pastebin.com/NgE6NU8A) –