2016-02-26 56 views
3

我們試圖實現一個簡單的spark作業,它讀取一個CSV文件(1行數據),並使用預先構建的隨機森林模型對象進行預測。這項工作不包括任何數據預處理或數據處理。減少Apache Spark作業/應用程序的運行時間

我們在獨立模式下運行spark,應用程序在本地運行。的配置如下: RAM:8GB 內存:40GB 芯數:2 火花版本:1.5.2 Scala的版本:2.10.5 輸入文件大小:1KB(1行的數據) 模型文件大小:1,595 KB(400樹隨機森林)

目前,spark-submit中的實現大約需要13秒。然而,運行時間是該應用程序,因此

  1. 巨大關注的是有沒有辦法來優化代碼,使運行時間縮短至1或2秒? (高優先級)

  2. 我們注意到,啓動時實際代碼的執行時間約爲7-8秒,設置上下文大約需要5-6秒,所以有一種方法可以在運行時保持火花上下文運行火花提交。

這裏是應用程序代碼

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object RF_model_App { 
    def main(args: Array[String]) { 

val conf = new SparkConf().setAppName("Simple Application") 
val sc = new SparkContext(conf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.feature4.{RandomForestfeature4Model, RandomForestClassifier} 
import org.apache.spark.ml.evaluation.Multiclassfeature4Evaluator 
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} 
import org.apache.spark.sql.functions.udf 
import org.apache.spark.ml.feature.VectorAssembler 
import org.apache.spark.ml.feature.StringIndexer 
import sqlContext.implicits._ 
val Test = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/home/ubuntu/Test.csv") 
Test.registerTempTable("Test") 
val model_L1 = sc.objectFile[RandomForestfeature4Model]("/home/ubuntu/RF_L1.model").first() 

val toInt = udf[Int, String](_.toInt) 
val toDouble = udf[Double, String](_.toDouble) 
val featureDf = Test.withColumn("id1", toInt(Test("id1"))) .withColumn("id2", toInt(Test("id2"))) .withColumn("id3", toInt(Test("id3"))) .withColumn("id4", toInt(Test("id4"))) .withColumn("feature3", toInt(Test("feature3"))) .withColumn("feature9", toInt(Test("feature9"))) .withColumn("feature10", toInt(Test("feature10"))) .withColumn("feature12", toInt(Test("feature12"))) .withColumn("feature14", toDouble(Test("feature14"))) .withColumn("feature15", toDouble(Test("feature15"))) .withColumn("feature16", toInt(Test("feature16"))) .withColumn("feature17", toDouble(Test("feature17"))) .withColumn("feature18", toInt(Test("feature18"))) 

val feature4_index = new StringIndexer() .setInputCol("feature4") .setOutputCol("feature4_index") 
val feature6_index = new StringIndexer() .setInputCol("feature6") .setOutputCol("feature6_index") 
val feature11_index = new StringIndexer() .setInputCol("feature11") .setOutputCol("feature11_index") 
val feature8_index = new StringIndexer() .setInputCol("feature8") .setOutputCol("feature8_index") 
val feature13_index = new StringIndexer() .setInputCol("feature13") .setOutputCol("feature13_index") 
val feature2_index = new StringIndexer() .setInputCol("feature2") .setOutputCol("feature2_index") 
val feature5_index = new StringIndexer() .setInputCol("feature5") .setOutputCol("feature5_index") 
val feature7_index = new StringIndexer() .setInputCol("feature7") .setOutputCol("feature7_index") 
val vectorizer_L1 = new VectorAssembler() .setInputCols(Array("feature3", "feature2_index", "feature6_index", "feature4_index", "feature8_index", "feature7_index", "feature5_index", "feature10", "feature9", "feature12", "feature11_index", "feature13_index", "feature14", "feature15", "feature18", "feature17", "feature16")).setOutputCol("features_L1") 
val feature_pipeline_L1 = new Pipeline() .setStages(Array(feature4_index, feature6_index, feature11_index,feature8_index, feature13_index, feature2_index, feature5_index, feature7_index,vectorizer_L1)) 
val testPredict= feature_pipeline_L1.fit(featureDf).transform(featureDf) 
val getPOne = udf((v: org.apache.spark.mllib.linalg.Vector) => v(1)) 
val getid2 = udf((v: Int) => v) 
val L1_output = model_L1.transform(testPredict).select(getid2($"id2") as "id2",getid2($"prediction") as "L1_prediction",getPOne($"probability") as "probability") 

L1_output.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").mode("overwrite").save("/home/L1_output") 

    } 
}; 

回答

1

讓我們開始用東西是完全錯誤:您使用

  • 特性機制僅僅是不正確。 StringIndexer根據數據分佈分配索引,因此相同的記錄根據其他記錄具有不同的編碼。您應該使用相同的StringIndexerModel(-s)進行培訓,測試和預測。
  • val getid2 = udf((v: Int) => v)只是一個昂貴的身份。

持續SparkContext

有多種工具,保持持續的背景下,包括job-serverLivy

最後,您可以簡單地使用Spark Streaming並只處理數據。

洗牌

您同時還使用repartition創建一個單一的,因此,我想一個CSV文件。這個操作非常昂貴,但是根據定義,它隨機重新刷新RDD中的數據以創建更多或更少的分區並在其間進行平衡。這總是通過網絡混洗所有數據。

其他考慮

如果延遲是重要的,你只使用一個單一的,低性能的機器,不使用火花可言。這裏沒有什麼可以獲得的。一個好的本地圖書館在這種情況下可以做得更好。

注意

我們不會訪問您的數據或您的硬件,因此任何要求,希望縮短時間7秒是完全沒有意義的。

相關問題