2015-06-23 63 views
-1

我已經擴展了Apache Spark官方網站上解釋的Java中現有的WordCount示例。擴展名是:在Apache Spark中top()不能用JavaPairRDD運行

  1. 根據元組更多的存在對元組進行排序。 (eu,11),(nec,8),(sit,7)
  2. (例如:現有的未分類訂單: (nec,8),(eu,11),(sit,7) 我想要的排序順序:
  3. 從排序列表中排名前3位。 排序工作正常,但top()在JavaPairRDD上不起作用。讓我貼我的代碼.. 其他方法是一樣的,所以我在這裏寫我的主要方法:
public static void main(String[] args) { 
     if (args.length < 1) { 
      System.err 
        .println("Please provide the input file full path as argument"); 
      System.exit(0); 
     } 
 SparkConf conf = new  SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local"); 
     JavaSparkContext context = new JavaSparkContext(conf); 
     JavaRDD<String> file = context.textFile(args[0]); 
     JavaRDD<String> words = file.flatMap(WORDS_EXTRACTOR); 

     /*Pairs with key= words and values=no.of occurances*/ 
     JavaPairRDD<String, Integer> pairs = words.mapToPair(WORDS_MAPPER); 
     JavaPairRDD<String, Integer> counter = pairs.reduceByKey(WORDS_REDUCER); 

     // First swapping (making key= no.of occurance and value=words... to allow sort based on no.of occurances) 
     JavaPairRDD<Integer, String> swappedPair = counter.mapToPair(new PairFunction<Tuple2<String,  Integer>, Integer, String>() { 
      public Tuple2<Integer, String> call(Tuple2<String, Integer> item) throws Exception { 
       return item.swap(); 
      } 

     }); 

     // after swapping tuples are sorted based on no.of occurances 
     JavaPairRDD<Integer, String> sortedCounter = swappedPair.sortByKey(false); 

     // Reverse the swapping 
     JavaPairRDD<String, Integer> reverseSwappedPair = sortedCounter.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { 
      public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception { 
       return item.swap(); 
      } 

     }); 
     **reverseSwappedPair.top(3)**; 
     reverseSwappedPair.saveAsTextFile(args[1]); 

     } 
} 

沒有大膽(**)線,剩下的代碼運行罰款並給出正確的結果意味着基於單詞的編號排序的元組順序。的存在。我寫紅線來獲得排名前3的元組,但它給出了下面所示的例外。我嘗試了其他JavaDD選項,以及

JavaRDD co = JavaRDD.fromRDD(JavaPairRDD.toRDD(reverseSwappedPair),ReverseSwappedPair.classTag()); co.top(3);

但它也給出了與下面相同的例外。請幫我解決這個問題。我嘗試了其他選項,但沒有結果。

Exception: 
15/06/23 07:21:28 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) 
java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable 
    at com.google.common.collect.NaturalOrdering.compare(NaturalOrdering.java:26) 
    at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153) 
    at scala.math.Ordering$$anon$4.compare(Ordering.scala:111) 
    at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35) 
    at com.google.common.collect.Ordering.max(Ordering.java:572) 
    at com.google.common.collect.Ordering.leastOf(Ordering.java:688) 
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37) 
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1334) 
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1331) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
15/06/23 07:21:28 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable 
    at com.google.common.collect.NaturalOrdering.compare(NaturalOrdering.java:26) 
    at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153) 
    at scala.math.Ordering$$anon$4.compare(Ordering.scala:111) 
    at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35) 
    at com.google.common.collect.Ordering.max(Ordering.java:572) 
    at com.google.common.collect.Ordering.leastOf(Ordering.java:688) 
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37) 
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1334) 
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1331) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745 

回答

1

我認爲你可以使用另一個API:java.util.List的頂部(INT NUM, 了java.util.Comparator COMP)

它不能直接比較兩個元。寫你的自定義比較器。希望這會有所幫助

+0

是否有可能給我示例代碼示例。我真的需要這個 –

+0

你需要實現一個compartor作爲(這僅僅是例子。請更改比較的邏輯根據自己的需要)
'私有靜態類CustomComaprator實現Serializable,比較{ \t \t公衆詮釋比較(對象O11,對象o12){ \t \t \t Tuple2 o1 =(Tuple2 )o11; \t \t \t Tuple2 o2 =(Tuple2 )o12; \t \t \t如果(o1._2> o2._2){ \t \t \t \t返回o1._2; \t \t \t}其他{ \t \t \t \t回報o2._2; \t \t \t} \t \t} \t}' ,同時使用頂部API,如下使用它: 'reverseSwappedPair。(3,新的CustomComaprator())' 非常抱歉的格式問題。 – Mayur

相關問題