2017-04-15 181 views
0

所以我一直在嘗試幾天來在Spark的map函數裏面運行ML算法。我貼一個更具體的question但引用星火的ML算法使我有以下錯誤:在Spark中運行ML算法裏面的map函數

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized? 

很明顯,我不能引用SparkContextapply_classifier函數內。 我的代碼是類似於在前面的問題,我問建議,但至今還沒有找到一個解決我所期待的:

def apply_classifier(clf): 
    dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3) 
    if clf == 0: 
     clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3) 
    elif clf == 1: 
     clf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=5) 

classifiers = [0, 1] 

sc.parallelize(classifiers).map(lambda x: apply_classifier(x)).collect() 

我一直在使用flatMap代替map嘗試,但我得到NoneType object is not iterable

我還想在apply_classifier函數中傳遞一個廣播數據集(這是一個DataFrame)作爲參數。 最後,我有可能做我想做的事情嗎?有什麼選擇?

回答

5

is it possible to do what I am trying to do?

它不是。 Apache Spark不支持任何形式的嵌套,分佈式操作只能由驅動程序初始化。這包括訪問分佈式數據結構,如Spark DataFrame

What are the alternatives?

這取決於許多因素,如數據的大小,可用資源的數量和算法的選擇。一般而言,您有三種選擇:

  • 僅使用Spark作爲任務管理工具來訓練本地非分佈式模型。看起來你已經在某種程度上探索了這條道路。要進一步實施此方法,您可以檢查spark-sklearn

    一般來說,這種方法在數據相對較小時特別有用。它的優點是多個工作之間沒有競爭。

  • 使用標準的多線程工具從一個上下文中提交多個獨立的作業。您可以使用例如threadingjoblib

    雖然這種方法是可能的,但我不會在實踐中推薦它。並非所有Spark組件都是線程安全的,並且必須非常小心才能避免意外行爲。它也使您很少控制資源分配。

  • 參數化您的Spark應用程序並使用外部管線管理器(Apache Airflow,Luigi,Toil)提交您的工作。雖然這種方法有一些缺點(它需要將數據保存到持久性存儲),但它也是最普遍和最健壯的,並且對資源分配提供了很多控制。

+0

謝謝你的回答。我會檢查這些外部管道經理! –