2017-11-03 129 views
0

剛做了一些關於spark unpersist()的實驗,並對實際做了什麼感到困惑。我搜索了很多,幾乎所有的人都說unpersist()會立即從excutor的內存中驅逐RDD。但在這個測試中,我們可以看到它並不總是真實的。請參見下面的簡單的測試:Spark unpersist()有不同的策略嗎?

private static int base = 0; 
public static Integer[] getInts(){ 
    Integer[] res = new Integer[5]; 
    for(int i=0;i<5;i++){ 
     res[i] = base++; 
    } 
    System.out.println("number generated:" + res[0] + " to " + res[4] + "---------------------------------"); 
    return res; 
} 

public static void main(String[] args) 
{ 
    SparkSession sparkSession = SparkSession.builder().appName("spark test").getOrCreate(); 
    JavaSparkContext spark = new JavaSparkContext(sparkSession.sparkContext()); 

    JavaRDD<Integer> first = spark.parallelize(Arrays.asList(getInts())); 
    System.out.println("first: " + Arrays.toString(first.collect().toArray())); // action 
    first.unpersist(); 
    System.out.println("first is unpersisted"); 

    System.out.println("compute second ========================"); 
    JavaRDD<Integer> second = first.map(i -> { 
     System.out.println("double " + i); 
     return i*2; 
    }).cache(); // transform 
    System.out.println("second: " + Arrays.toString(second.collect().toArray())); // action 
    second.unpersist(); 

    System.out.println("compute third ========================"); 
    JavaRDD<Integer> third = second.map(i -> i+100); // transform 
    System.out.println("third: " + Arrays.toString(third.collect().toArray())); // action 
} 

輸出爲:

number generated:0 to 4--------------------------------- 
first: [0, 1, 2, 3, 4] 
first is unpersisted 
compute second ======================== 
double 0 
double 1 
double 2 
double 3 
double 4 
second: [0, 2, 4, 6, 8] 
compute third ======================== 
double 0 
double 1 
double 2 
double 3 
double 4 
third: [100, 102, 104, 106, 108] 

正如我們所看到的,unpersist() '第一' 是無用的,它不會重新計算。 但unpersist()'second'將觸發重新計算。 任何人都可以幫我弄清楚爲什麼unpersist()'第一個'不會觸發重新計算?如果我想強迫'第一'被驅逐出內存,我該怎麼辦?並行或textFile()API的RDD有什麼特別之處嗎? 謝謝!

回答

1

此行爲與緩存無關,並且unpersisting。實際上first甚至不是persisted,雖然在這裏沒有太大的區別。

當你parallelize,你通過一個本地,非分佈式的對象。 parallelize的參數的值爲,其生命週期完全超出了Spark的範圍。因此,一旦ParallelCollectionRDD已經初始化,Spark根本沒有理由重新計算它。如果你想分發不同的集合,只需創建一個新的RDD

還值得注意的是,unpersist可以在阻塞和非阻塞模式下調用,具體取決於blocking參數。