2017-03-17 33 views
6

作爲this question的延續,請問能告訴我哪些屬性可以從SparkContext.setLocalProperties更改?我可以更換核心,RAM等嗎?SparkContext setLocalProperties

+0

我認爲這部分應該給你可用的屬性:http://spark.apache.org/docs/latest/configuration.html#available-properties – Adonis

+0

@asettouf是否意味着我可以運行在獨立羣集管理器上同時具有不同屬性的多個作業? –

+0

根據我的理解,它在你正在運行的線程上設置屬性,所以理論上可以產生一個新的線程,使用相同的SparkContext,並設置不同的本地屬性(注意不要試圖創建幾個SparkContext),就像doc似乎指向(我沒有測試,所以我不能肯定它會工作)http://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/SparkContext.html#setLocalProperty(java .lang.String,%20java.lang.String) – Adonis

回答

3

根據文檔說明localPropertiesSparkContextprotected[spark]屬性,它們是您可以通過其創建邏輯作業組的屬性。另一方面他們是Inheritable線程局部變量。這意味着當變量中維護的per-thread-attribute必須自動傳輸到創建的任何子線程時,它們優先於普通線程局部變量使用。當請求運行SparkContext時,將本地屬性傳播給worker或者提交一份Spark工作,然後將其傳遞給DAGScheduler

而在一般情況Local properties用來組工作進入FAIR作業調度程序池的spark.scheduler.pool每個線程的性能和方法SQLExecution.withNewExecutionId設置spark.sql.execution.id

我沒有這樣的經驗,在獨立的Spark集羣中分配線程本地屬性。值得嘗試和檢查它。

+0

感謝細節!我會在幾天內嘗試 –

3

我對屬性spark.executor.memory(可用屬性爲here)進行了一些測試,實際上在一個非常簡單的本地Spark上,啓動兩個線程,每個線程都有不同的設置,似乎只限於線程,沒有代碼,你會部署到生產)在這篇文章的結尾,使一些交錯的線程,以確保它不是通過一些純粹的調度運氣,我獲得以下輸出(清理火花輸出到我的控制檯):

Thread 1 Before sleeping mem: 512 
Thread 2 Before sleeping mem: 1024 
Thread 1 After sleeping mem: 512 
Thread 2 After sleeping mem: 1024 

非常整齊地觀察線程中聲明的屬性留在所述線程內,但我很確定它很容易導致無意義的si調整,所以我仍然建議在應用此類技術之前謹慎行事。

public class App { 
    private static JavaSparkContext sc; 
    public static void main(String[] args) { 
     SparkConf conf = new SparkConf().setMaster("local") 
       .setAppName("Testing App"); 
     sc = new JavaSparkContext(conf); 
     SparkThread Thread1 = new SparkThread(1); 
     SparkThread Thread2 = new SparkThread(2); 
     ExecutorService executor = Executors.newFixedThreadPool(2); 
     Future ThreadCompletion1 = executor.submit(Thread1); 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 
     Future ThreadCompletion2 = executor.submit(Thread2); 
     try { 
      ThreadCompletion1.get(); 
      ThreadCompletion2.get(); 
     } catch (InterruptedException | ExecutionException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 

    private static class SparkThread implements Runnable{ 
     private int i = 1; 
     public SparkThread(int i) { 
      this.i = i; 

     } 
     @Override 
     public void run() { 
      int mem = 512; 
      sc.setLocalProperty("spark.executor.memory", Integer.toString(mem * i)); 
      JavaRDD<String> input = sc.textFile("test" + i); 

      FlatMapFunction<String, String> tt = s -> Arrays.asList(s.split(" ")) 
        .iterator(); 
      JavaRDD<String> words = input.flatMap(tt); 
      System.out.println("Thread " + i + " Before sleeping mem: " + sc.getLocalProperty("spark.executor.memory")); 

      try { 
       Thread.sleep(7000); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      //do some work 
      JavaPairRDD<String, Integer> counts = words.mapToPair(t -> new Tuple2(t, 1)) 
        .reduceByKey((x, y) -> (int) x + (int) y); 

      counts.saveAsTextFile("output" + i); 
      System.out.println("Thread " + i + " After sleeping mem: " + sc.getLocalProperty("spark.executor.memory")); 
     } 

    } 
} 
0

LocalProperties提供一種簡單的機制來傳遞從所述驅動器向執行者(用戶定義)的配置。您可以使用執行程序上的TaskContext訪問它們。一個例子是SQL Execution ID

+0

請詳細說明。請給我舉一個例子,說明如何在同一個會話中更換不同作業的核心/ RAM? –

+0

@VolodymyrBakhmatiuk不知道我理解你的問題,因爲給定可用的Spark屬性,只要將它們正確地封裝在不同的線程中,就可以修改驅動程序核心和內存,以及每個作業的執行程序內存。我在這裏錯過了什麼嗎? – Adonis

+0

@asettouf我只是想知道赫爾曼的想法和你的想法有什麼不同 –

相關問題