2017-04-13 40 views
0

我有一個Spark流工作,其目標是:如何在火花傳輸作業期間更新ML模型而無需重新啓動應用程序?

  • 使用預訓練ML管道

的讀取一批消息

  • 預測變量Y的給這些消息問題是,我希望能夠更新執行者使用的模型,而無需重新啓動應用程序。

    簡單地說,這裏是什麼樣子:

    model = #model initialization 
    
    def preprocess(keyValueList): 
        #do some preprocessing 
    
    def predict(preprocessedRDD): 
        if not preprocessedRDD.isEmpty(): 
         df = #create df from rdd 
         df = model.transform(df) 
         #more things to do 
    
    stream = KafkaUtils.createDirectStream(ssc, [kafkaTopic], kafkaParams) 
    
    stream.mapPartitions(preprocess).foreachRDD(predict) 
    

    在這種情況下,模型簡單地使用。未更新。

    我已經考慮過幾種可能性,但現在我已經越過他們全力以赴:

    • 廣播它改變了模型每次(不能更新,只讀)
    • 從HDFS讀取模型上執行者(它需要SparkContext所以不可能)

    任何想法?

    非常感謝!

  • +0

    您使用哪種ML算法? Spark中有幾種面向流的算法。我猜你已經排除了這些,但是爲了以防萬一。 – ImDarrenG

    +0

    嗨,目前我們正在考慮使用Word2Vec來構建功能。我們還沒有選擇模型,它必須是靈活的。 –

    +0

    在這種情況下,我會使用Flask構建一個REST API,它會公開調用train,更新並預測您通過來自'predict(預處理的RDD)'函數的HTTP請求調用。這樣的解耦將爲您提供更大的靈活性,包括您選擇的模型,更新頻率,存儲位置以及升級模型和流式作業代碼的方式。 – ImDarrenG

    回答

    -1

    您傳遞給foreachRDD的函數由驅動程序執行,它只是由執行程序執行的rdd操作本身,因此您不需要序列化模型 - 假設您使用的是操作Spark ML管道在RDD上,據我所知他們都這樣做。 Spark爲您處理訓練/預測,您無需手動分發。

    0

    我在兩種不同的方法來解決這個問題之前:

    • 型號
    • 重讀每批模型中的TTL

    這兩項解決方案,假設一個額外的在職培訓記錄你經常積累的數據(例如每天一次)。

    +0

    我在廣播文檔中沒有看到提及的任何TTL。你能提供一個鏈接嗎? 此外,你是一個低調的ImDarrenG?如果是這樣,請注意解釋一下? –

    +0

    它不是內置API的一部分,它通常需要依靠外部配置機制(如Zookeeper或Consul)來實現,您可以在其中存儲模型無效的時間。 – BenFradet

    +0

    另外,我個人使用'transform'對每個'RDD'進行批量預測,並保留'foreachRDD'來輸出結果。由於它沒有提供對這個問題的答案,我低估了。 – BenFradet

    相關問題