2015-06-09 109 views
0

對於一個研究項目,我嘗試對RDD中的元素進行排序。我用兩種不同的方法做到了這一點。排序RDD元素

在第一種方法中,我在RDD上應用了mapPartitions()函數,以便對RDD的內容進行排序,並提供包含排序列表的結果RDD作爲RDD中的唯一記錄。然後,我應用了基本合併排序列表的reduce函數。

我在包含30個節點的EC2集羣上運行這些實驗。我使用spark ec2腳本進行設置。數據文件存儲在HDFS中。

在第二種方法中,我使用Spark中的sortBy方法。

我進行這些操作對美國人口普查數據(100MB)發現here

單線條看起來像這樣

9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000. 

我基於CSV 25值排序。在這一行是1758.14。

我注意到sortBy執行比其他方法更糟糕。這是預期的情況嗎?如果是,爲什麼mapPartitions()和reduce()不是默認的排序方法?

這是我實現

public static void sortBy(JavaSparkContext sc){ 
     JavaRDD<String> rdd = sc.textFile("/data.txt",32); 
     long start = System.currentTimeMillis(); 
     rdd.sortBy(new Function<String, Double>(){ 

      @Override 
       public Double call(String v1) throws Exception { 
         // TODO Auto-generated method stub 
        String [] arr = v1.split(","); 
        return Double.parseDouble(arr[24]); 
       } 
     }, true, 9).collect(); 
     long end = System.currentTimeMillis(); 
     System.out.println("SortBy: " + (end - start)); 
    } 

public static void sortList(JavaSparkContext sc){ 
     JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8); 
     long start = System.currentTimeMillis(); 
     JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){ 

     @Override 
     public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t) 
      throws Exception { 
      // TODO Auto-generated method stub 
      LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>(); 
      while(t.hasNext()){  
      String s = t.next(); 
      String arr1[] = s.split(","); 
      Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s); 
      lines.add(t1); 
      } 
      Collections.sort(lines, new IncomeComparator()); 
      LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>(); 
      list.add(lines); 
      return list; 
     } 

     }); 
     rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){ 

     @Override 
     public LinkedList<Tuple2<Double, String>> call(
       LinkedList<Tuple2<Double, String>> a, 
       LinkedList<Tuple2<Double, String>> b) throws Exception { 
      // TODO Auto-generated method stub 
      LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>(); 
      while (a.size() > 0 && b.size() > 0) { 

      if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0) 
       result.add(a.poll()); 
      else 
       result.add(b.poll()); 
      } 

      while (a.size() > 0) 
      result.add(a.poll()); 

      while (b.size() > 0) 
      result.add(b.poll()); 

      return result; 

     } 

     });  
     long end = System.currentTimeMillis(); 
     System.out.println("MapPartitions: " + (end - start)); 
    } 
+0

這可能是一個更好的郵件列表問題。 –

回答

0

Collect()是一大瓶頸,因爲它返回所有結果給司機。
它產生IO命中&額外的網絡流量到單一源(在這種情況下 - 驅動程序)。
它也阻止其他操作。

代替在第一sortBy()代碼段collect(), 嘗試比讀回使用sc.textFile(tmp)執行並行操作,例如saveAsTextFile(tmp)

其他sortBy()代碼段同時使用mapPartitions()和reduce()並行API - 因此整個工作都是並行完成的。
似乎這是導致端到端性能差異的原因。

請注意,您的發現並不一定意味着所有機器的執行時間總和會變差。