2016-08-10 149 views
0

負載RandomForestModel我訓練和使用星火mllib一個相當沉重的隨機森林模型1.3.0在星火使用流

我想用一個Spark流工作用它實時分類保存(拼花格式),但面臨的困難很少,主要與模型的大小和將其發送給工人的必要性有關。

現在,我看到3個解決方案,其中沒有一個是理想的:

  • 加載駕駛員側,將其發送給員工上的每個計算微批次。示例代碼:

    val model = RandomForestModel.load(sc, path) 
    stream.map(smthg => model.predict(...)) 
    

    這裏的問題是它會在每批中序列化併發送模型。有關信息,我必須將spark.akka.frameSize設置爲大於50MB才能執行而不出錯。這顯然是不可持續的

  • 加載每個工人的模型。在Spark 1.3中,似乎沒有辦法獲取當前的SparkContext,所以我必須爲每個工作人員創建一個新的,以便能夠加載模型。這意味着/所需的第一條微一批集羣上額外的CPU工作迭代

  • 駕駛員側執行預測:

    val model = RandomForestModel.load(sc, path) 
    stream 
        .map(smthg => stuff) 
        .foreachRDD(rdd => model.predict(rdd)) 
    

    由於不能夠很好地擴展的缺點,因爲一切都發生在司機側。

回答

0

首先最後一種方法不在驅動程序上執行,實際上是no different than the first one

def predict(features: RDD[Vector]): RDD[Double] = features.map(x => predict(x)) 

隨機森林模型不是分佈式和可序列化的,所以你可以使用標準的工具來編寫:

import java.io._ 
import org.apache.spark.mllib.tree.model.RandomForestModel 

val model: RandomForestModel = ??? 

val os = new ObjectOutputStream(new FileOutputStream("/tmp/rf")) 
os.writeObject(model) 
os.close() 

後來閱讀:

val is = new ObjectInputStream(new FileInputStream("/tmp/rf") 
val model = is.readObject().asInstanceOf[RandomForestModel] 
is.close 

但這種方法是不能移植到新的ML API,因此在實踐中調整管道以減少模型尺寸更有意義。