2015-06-03 40 views
2

我寫了一個小的Spark應用程序,它應該測量Spark需要在分區的RDD上運行一個操作的時間(combineByKey函數來求和一個值)。如何測量Spark在分區RDD上執行操作所需的時間?

我的問題是,第一次迭代似乎工作正常(計算持續時間〜25毫秒),但下一個需要更少的時間(~5毫秒)。在我看來,Spark在沒有任何請求的情況下堅持數據!?我可以通過編程來避免嗎?

我必須知道Spark需要計算一個新的RDD的持續時間(沒有任何緩存/持續的早期迭代) - >我認爲持續時間應該總是大約20-25毫秒!

爲了確保我感動SparkContext代入for循環,但這並沒有帶來任何變更的重新計算...

感謝您的建議!

這裏我的代碼,這似乎仍然存在的任何數據:

public static void main(String[] args) { 

    switchOffLogging(); 

    // jetzt 

    try { 
     // Setup: Read out parameters & initialize SparkContext 
     String path = args[0]; 
     SparkConf conf = new SparkConf(true); 
     JavaSparkContext sc; 

     // Create output file & writer 
     System.out.println("\npar.\tCount\tinput.p\tcons.p\tTime"); 

     // The RDDs used for the benchmark 
     JavaRDD<String> input = null; 
     JavaPairRDD<Integer, String> pairRDD = null; 
     JavaPairRDD<Integer, String> partitionedRDD = null; 
     JavaPairRDD<Integer, Float> consumptionRDD = null; 

     // Do the tasks iterative (10 times the same benchmark for testing) 
     for (int i = 0; i < 10; i++) { 
      boolean partitioning = true; 
      int partitionsCount = 8; 

      sc = new JavaSparkContext(conf); 
      setS3credentials(sc, path); 

      input = sc.textFile(path); 
      pairRDD = mapToPair(input); 

      partitionedRDD = partition(pairRDD, partitioning, partitionsCount); 

      // Measure the duration 
      long duration = System.currentTimeMillis(); 
      // Do the relevant function 
      consumptionRDD = partitionedRDD.combineByKey(createCombiner, mergeValue, mergeCombiners); 
      duration = System.currentTimeMillis() - duration; 

      // So some action to invoke the calculation 
      System.out.println(consumptionRDD.collect().size()); 

      // Print the results 
      System.out.println("\n" + partitioning + "\t" + partitionsCount + "\t" + input.partitions().size() + "\t" + consumptionRDD.partitions().size() + "\t" + duration + " ms"); 

      input = null; 
      pairRDD = null; 
      partitionedRDD = null; 
      consumptionRDD = null; 

      sc.close(); 
      sc.stop(); 

     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
     System.out.println(e.getMessage()); 
    } 
} 

一些輔助功能(應該不是問題):

private static void switchOffLogging() { 
    Logger.getLogger("org").setLevel(Level.OFF); 
    Logger.getLogger("akka").setLevel(Level.OFF); 
} 

private static void setS3credentials(JavaSparkContext sc, String path) { 
    if (path.startsWith("s3n://")) { 
     Configuration hadoopConf = sc.hadoopConfiguration(); 
     hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); 
     hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); 
     hadoopConf.set("fs.s3n.awsAccessKeyId", "mycredentials"); 
     hadoopConf.set("fs.s3n.awsSecretAccessKey", "mycredentials"); 
    } 
} 

// Initial element 
private static Function<String, Float> createCombiner = new Function<String, Float>() { 
    public Float call(String dataSet) throws Exception { 
     String[] data = dataSet.split(","); 
     float value = Float.valueOf(data[2]); 
     return value; 
    } 
}; 

// merging function for a new dataset 
private static Function2<Float, String, Float> mergeValue = new Function2<Float, String, Float>() { 
    public Float call(Float sumYet, String dataSet) throws Exception { 
     String[] data = dataSet.split(","); 
     float value = Float.valueOf(data[2]); 
     sumYet += value; 
     return sumYet; 
    } 
}; 

// function to sum the consumption 
private static Function2<Float, Float, Float> mergeCombiners = new Function2<Float, Float, Float>() { 
    public Float call(Float a, Float b) throws Exception { 
     a += b; 
     return a; 
    } 
}; 

private static JavaPairRDD<Integer, String> partition(JavaPairRDD<Integer, String> pairRDD, boolean partitioning, int partitionsCount) { 
    if (partitioning) { 
     return pairRDD.partitionBy(new HashPartitioner(partitionsCount)); 
    } else { 
     return pairRDD; 
    } 
} 

private static JavaPairRDD<Integer, String> mapToPair(JavaRDD<String> input) { 
    return input.mapToPair(new PairFunction<String, Integer, String>() { 
     public Tuple2<Integer, String> call(String debsDataSet) throws Exception { 
      String[] data = debsDataSet.split(","); 
      int houseId = Integer.valueOf(data[6]); 
      return new Tuple2<Integer, String>(houseId, debsDataSet); 
     } 
    }); 
} 

和火花控制檯的最後輸出:

part. Count input.p cons.p Time 
true 8  6  8  20 ms 
true 8  6  8  23 ms 
true 8  6  8  7 ms  // Too less!!! 
true 8  6  8  21 ms 
true 8  6  8  13 ms 
true 8  6  8  6 ms  // Too less!!! 
true 8  6  8  5 ms  // Too less!!! 
true 8  6  8  6 ms  // Too less!!! 
true 8  6  8  4 ms  // Too less!!! 
true 8  6  8  7 ms  // Too less!!! 

回答

1

我現在爲我找到了一個解決方案:我寫了一個獨立的類,在新進程中調用spark-submit命令。這可以在一個循環中完成,因此每個基準都在一個新線程中啓動,並且sparkContext也按照每個進程分開。所以垃圾收集完成,一切正常!

String submitCommand = "/root/spark/bin/spark-submit " + submitParams + " -- class partitioning.PartitionExample /root/partitioning.jar " + javaFlags; 
Process p = Runtime.getRuntime().exec(submitCommand); 

BufferedReader reader; 
String line; 

System.out.println(p.waitFor()); 
reader = new BufferedReader(new InputStreamReader(p.getInputStream()));   
while ((line = reader.readLine())!= null) { 
    System.out.println(line); 
} 
0

如果shuffle輸出足夠小,則Spark shuffle文件將寫入OS緩衝區因爲fsync沒有被明確地調用......這意味着,只要有空間,你的數據就會保留在內存中。

如果確實需要冷性能測試,那麼您可以嘗試類似this attempt to flush the disk的東西,但這會降低每個測試之間的中間值。你可以旋轉上下文嗎?這可能會解決你的需求。

+0

試圖從[鏈接](http://superuser.com/a/319287),它說運行 1) 「命令同步」 和 2)「回聲3>須藤/ PROC linux的命令/ sys/vm/drop_caches「 沒有成功... 我也嘗試了SparkConf.set方法 ** SparkConf conf = new SparkConf(true).set(」spark.files.useFetchCache「,」false「); ** 但也沒有效果... –

相關問題