2017-08-05 75 views
0

假設我想使用創建方式昂貴的對象來映射RDD。我想爲每個工作者/線程創建一個對象,並且必須在處理每個工作者上的RDD分區的項目之前創建它。在Apache Spark上爲每個工作人員創建一個單身人士

我的解決辦法是:

final Function0<ModelEvaluator> f =() -> { 

     if (ModelEvaluator.getInstance() == null) { 
      ModelEvaluator m = new ModelEvaluator(script); 
      ModelEvaluator.setInstance(m); 
     } 

     return ModelEvaluator.getInstance(); 
    }; 

    JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
      (t) -> { 
       try { 
        double val = f.call().evaluateModel(t); 
        return new Tuple2<>(val, t); 
       } catch (Exception ex) { 
        return null; 
       } 
      } 
    ); 



public class ModelEvaluator { 

    private static ModelEvaluator instance; 

    public static void setInstance(ModelEvaluator instance) { 
    ModelEvaluator.instance = instance; 
    } 

    public static ModelEvaluator getInstance() { 
     return instance; 
    } 
... 

在這種情況下,「ModelEvaluator」對象分析的腳本,然後使用「服務」對象的列表,以便計算出相應的響應度量配置模型參數該參數配置。但我不想在每次處理RDD行時解析腳本。

我還配置了我的集羣爲每個集羣創建一個進程,並且每個進程只會產生一個工人,因爲同一進程中多個工人同時訪問一個具有可變狀態的單例實例時會出現問題。

有沒有更適合我的問題的解決方案?

回答

1

這可以通過Broadcast變量完成。這將允許您在驅動程序上創建一個對象,並根據需要將其發送給每個工作人員一次。

final Broadcast<ModelEvaluator> model = jsc.broadcast(new ModelEvaluator(script)); 

JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
     (t) -> { 
      try { 
       double val = model.value().evaluateModel(t); 
       return new Tuple2<>(val, t); 
      } catch (Exception ex) { 
       return null; 
      } 
     } 
); 
+1

非常感謝你,它像一個魅力工作。我需要製作「ModelEvaluator」類Serializable,並將某些字段配置爲瞬態以避免出現問題。我需要使用一些邏輯來執行對象的延遲初始化,而不是在構造函數上初始化它。 –