2017-08-06 106 views
0

鍵排序我JavaPairRDD類型的關鍵Tuple2<Integer, Integer>在apache的火花JavaPairRDD

我希望通過我的鑰匙JavaPairRDD排序所以我寫了這樣一個比較:

JavaPairRDD<Tuple2<Integer, Integer>, Integer> Rresult=result.sortByKey(new Comparator<Tuple2<Integer, Integer>>() { 
    @Override 
    public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) { 
     if(o1._1()==o2._1()) 
      return o1._2()-o2._2(); 
     return o1._1()-o2._1(); 
     } 
},true); 

這個排序值通過第一次進入元組,如果它們與第二項相同。

但我收到以下錯誤堆棧:

java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 

.. scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1083) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) 
    at 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 
    at java.io.ObjectStrea 

回答

1

你如何創建JavaPairRDD?在應用分類之前請檢查它。 Yow也將得到Task不可序列化的異常,直接在sortByKey方法中使用新的比較器。您應該在單獨的課程中實施ComparatorSerializable並將其傳遞給sortByKey方法。以下是供您參考的樣本。

public class SparkSortSample { 
public static void main(String[] args) { 
    //SparkSession 
    SparkSession spark = SparkSession 
      .builder() 
      .appName("SparkSortSample") 
      .master("local[1]") 
      .getOrCreate(); 
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); 
    //Sample data 
    List<Tuple2<Tuple2<Integer, Integer>, Integer>> inputList = new ArrayList<Tuple2<Tuple2<Integer, Integer>, Integer>>(); 
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 444), 4444)); 
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(3, 333), 3333)); 
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(1, 111), 1111)); 
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 222), 2222)); 
    //JavaPairRDD 
    JavaPairRDD<Tuple2<Integer, Integer>, Integer> javaPairRdd = jsc.parallelizePairs(inputList); 
    //Sorted RDD 
    JavaPairRDD<Tuple2<Integer, Integer>, Integer> sortedPairRDD = javaPairRdd.sortByKey(new TupleComparator(), true); 
    sortedPairRDD.foreach(rdd -> { 
     System.out.println("sort = " + rdd); 
    }); 
    // stop 
    jsc.stop(); 
    jsc.close(); 
    } 
} 

這裏是TupleComparator類,它實現了Comparator和Serializable接口。

class TupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable { 
@Override 
public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) { 
    if (o1._1() == o2._1()) 
     return o1._2() - o2._2(); 
    return o1._1() - o2._1(); 
    } 
}