-1
我已經擴展了Apache Spark官方網站上解釋的Java中現有的WordCount示例。擴展名是:在Apache Spark中top()不能用JavaPairRDD運行
- 根據元組更多的存在對元組進行排序。 (eu,11),(nec,8),(sit,7) (例如:現有的未分類訂單: (nec,8),(eu,11),(sit,7) 我想要的排序順序:
- 從排序列表中排名前3位。 排序工作正常,但top()在JavaPairRDD上不起作用。讓我貼我的代碼.. 其他方法是一樣的,所以我在這裏寫我的主要方法:
public static void main(String[] args) { if (args.length < 1) { System.err .println("Please provide the input file full path as argument"); System.exit(0); }
SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> file = context.textFile(args[0]);
JavaRDD<String> words = file.flatMap(WORDS_EXTRACTOR);
/*Pairs with key= words and values=no.of occurances*/
JavaPairRDD<String, Integer> pairs = words.mapToPair(WORDS_MAPPER);
JavaPairRDD<String, Integer> counter = pairs.reduceByKey(WORDS_REDUCER);
// First swapping (making key= no.of occurance and value=words... to allow sort based on no.of occurances)
JavaPairRDD<Integer, String> swappedPair = counter.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
public Tuple2<Integer, String> call(Tuple2<String, Integer> item) throws Exception {
return item.swap();
}
});
// after swapping tuples are sorted based on no.of occurances
JavaPairRDD<Integer, String> sortedCounter = swappedPair.sortByKey(false);
// Reverse the swapping
JavaPairRDD<String, Integer> reverseSwappedPair = sortedCounter.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
return item.swap();
}
});
**reverseSwappedPair.top(3)**;
reverseSwappedPair.saveAsTextFile(args[1]);
}
}
沒有大膽(**)線,剩下的代碼運行罰款並給出正確的結果意味着基於單詞的編號排序的元組順序。的存在。我寫紅線來獲得排名前3的元組,但它給出了下面所示的例外。我嘗試了其他JavaDD選項,以及
JavaRDD co = JavaRDD.fromRDD(JavaPairRDD.toRDD(reverseSwappedPair),ReverseSwappedPair.classTag()); co.top(3);
但它也給出了與下面相同的例外。請幫我解決這個問題。我嘗試了其他選項,但沒有結果。
Exception:
15/06/23 07:21:28 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable
at com.google.common.collect.NaturalOrdering.compare(NaturalOrdering.java:26)
at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
at scala.math.Ordering$$anon$4.compare(Ordering.scala:111)
at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35)
at com.google.common.collect.Ordering.max(Ordering.java:572)
at com.google.common.collect.Ordering.leastOf(Ordering.java:688)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1331)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/06/23 07:21:28 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable
at com.google.common.collect.NaturalOrdering.compare(NaturalOrdering.java:26)
at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
at scala.math.Ordering$$anon$4.compare(Ordering.scala:111)
at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35)
at com.google.common.collect.Ordering.max(Ordering.java:572)
at com.google.common.collect.Ordering.leastOf(Ordering.java:688)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1331)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745
是否有可能給我示例代碼示例。我真的需要這個 –
你需要實現一個compartor作爲(這僅僅是例子。請更改比較的邏輯根據自己的需要) o1 =(Tuple2 )o11; \t \t \t Tuple2 o2 =(Tuple2 )o12; \t \t \t如果(o1._2> o2._2){ \t \t \t \t返回o1._2; \t \t \t}其他{ \t \t \t \t回報o2._2; \t \t \t} \t \t} \t}' ,同時使用頂部API,如下使用它: 'reverseSwappedPair。(3,新的CustomComaprator())' 非常抱歉的格式問題。 –
Mayur
'私有靜態類CustomComaprator實現Serializable,比較{ \t \t公衆詮釋比較(對象O11,對象o12){ \t \t \t Tuple2