2016-03-21 20 views
-1
SparkConf sparkConf = new SparkConf().setAppName("SummaryStatistics"); 
    JavaSparkContext spark = new JavaSparkContext(sparkConf); 

    JavaRDD<String> textFile = spark.textFile(args[0]); 

    JavaRDD<Vector> points = textFile.map(new ParsePoint()); 

    RowMatrix mat = new RowMatrix(points.rdd()); 
    MultivariateStatisticalSummary summary = mat.computeColumnSummaryStatistics(); 

    System.out.println(summary.mean()); 

    JavaRDD<Vector> result=(JavaRDD<Vector>) summary.mean(); // ***** Throwing error**** 
    result.saveAsTextFile(args[1]); 

我們怎樣才能把結果存儲summary.mean()一份文件。上面的方法(在向量RDD中投射summary.mean())不起作用並給出該異常。java.lang.ClassCastException:org.apache.spark.mllib.linalg.DenseVector不能被轉換爲org.apache.spark.api.java.JavaRDD

+0

爲什麼不使用ObjectOutputStream保存summary.mean();? –

+0

我使用HDFS作爲源代碼。 ObjectOutputStream不能在HDFS中創建/修改文件。 –

回答

1

你不能只是任何類型X的對象到JavaRDD<X>。您需要使用SparkContext的parallelize方法創建RDD。 - 所以,如果你真的想用星火保存單一載體,你可以通過創建基於單個記錄收集的RDD做到這一點:

List<Vector> oneItemList = new LinkedList<>(); 
oneItemList.add(summary.mean()); 
JavaRDD<Vector> result = spark.parallelize(oneItemList); 
result.saveAsTextFile(args[1]); 

但是,這是對矯枉過正(使用星火救一個記錄)。

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.*; 

Path path = new Path(args[1]); 
Configuration conf = new Configuration(); // set your HDFS properties if needed 
FileSystem fileSystem = FileSystem.get(conf); 
// (assuming Java 7 or higher) 
try (FSDataOutputStream out = fileSystem.create(path)) { 
    out.writeBytes(summary.mean().toString()); 
    out.flush(); 
} 

注:

或者,您可以使用HDFS API,像保存HDFS文件的例子使用Java 7,Hadoop的V2.4,星火V1.5.2 - 但API是穩定,所以不應該其他最新版本的變化很大。

相關問題