2017-09-08 105 views
0

我有以下代碼:Spark ml streaming predictOnValues如何保存結果?

StreamingLinearRegressionWithSGD regressionWithSGD = 
     new StreamingLinearRegressionWithSGD() 
       .setInitialWeights(Vectors.zeros(featuresNumber)); 

JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache(); 
JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse); 
regressionWithSGD.trainOn(trainingData); 
regressionWithSGD.predictOnValues(testData.mapToPair(lp -> new Tuple2<>(lp.label(), lp.features()))).print(); 

我希望把結果給某些文件/數據庫/隊列等,而不是print()這可能嗎?

回答

0

我已經想通了

StreamingLinearRegressionWithSGD regressionWithSGD = 
       new StreamingLinearRegressionWithSGD() 
         .setInitialWeights(Vectors.zeros(featuresNumber)); 

     JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache(); 
     JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse); 
     regressionWithSGD.trainOn(trainingData); 
     JavaDStream<Double> doubleJavaDStream=regressionWithSGD.predictOn(testData.map(labeledPoint -> labeledPoint.features())); 
     doubleJavaDStream.dstream().saveAsTextFiles("result","out"); 

因此,作爲一個結果,我們正在result- {}時間戳的文件夾的.out。