2015-10-05 79 views
0

我試圖在以下文本文件上運行sortByKey()函數。Spark Scala中的sortByKey()函數無法正常工作

EMP_NAME EMP_ID SALARY 
Adam  22  100 
Bob  25  102 
Bob  28  104 
Chris 29  110 

我正在EMP_NAME作爲下列文本文件的關鍵。我運行下面的命令:textFile.sortByKey() 我正在以下的輸出:

Bob 
Bob 
Adam 
Chris 

幫助是appreciated..Thank你。

+3

請添加您的代碼,以便於您的幫助。 – ale64bit

回答

4

如果您正在使用SparkConffiguration作爲

val conf = new SparkConf().setMaster("local") 

則默認情況下創建的分區的數量爲1

但是,如果你正在使用

val conf = new SparkConf().setMaster("local[*]") 

和你有額外的核心可用於Spark,它將根據它對數據進行分區,以便Spark能夠並行執行任務。

要獲得分區火花的數量已經作出:

println(partitions.length) 
//For my machine it was 2 

如果數據被分配,然後將分選在該分區僅,並在從每個分區輸出端被合併在的元素來完成。爲了避免這種情況,您可以在sortByKey方法中將numPartition強制爲1,並將數據放入一個分區,然後對其進行排序。

textFile.sortByKey(numPartitions = 1).foreach(println) 

這將使分割成1和你會得到整個輸入數據正確排序的輸出。

+1

這個答案如此救了我的一天,謝謝! –

+0

爲了獲得RDD調用分區的數量'getNumPartitions()' – wbmrcb

0

在這裏,我提供數據集和代碼來執行按鍵排序的功能,如果你不覺得有幫助,那麼請提供我們的代碼,我們將研究這個問題。

數據 - > (製表符分隔文件)

EMP_NAME EMP_ID SALARY 
Adam 22 100 
Bob 25 102 
Bob 28 104 
Chris 29 110 

代碼 - >

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

/* 
* @author Kshitij Kulshrestha 
*/ 

object A1 { 
def main(args: Array[String]): Unit = { 

// set up environment 
val sparkHome = "/usr/spark_pack/spark-1.4.1-bin-hadoop2.4/"; 
val sparkMasterUrl = "spark://SYSTEMX:7077"; 

val conf = new SparkConf() 
.setAppName("A1") 
.setMaster("local[2]") 
.setSparkHome(sparkHome) 

val sc = new SparkContext(conf) 

val dataRDD = sc.textFile("src/Source/A1_data").filter { !_.contains("EMP_NAME") } 
.map { x => 
{ 
val temp = x.split("\t") 

((temp(0)), (temp(1), temp(2))) 
} 
} 

val sortedDataRDD = dataRDD coalesce(1) sortByKey() 
sortedDataRDD foreach (println(_)) 

} 
} 

輸出 - >

(Adam,(22,100)) 
(Bob,(25,102)) 
(Bob,(28,104)) 
(Chris,(29,110)) 
+0

這個代碼在集羣中不會工作,或者當它作爲本地[*]分區數不會保持爲1時。 –

+0

檢查它,它將工作。 –

0

的Python:

sc.parallelize([['Chris',29,110],['Bob',28,104],['Bob',25,102],['Adam',22,100]]).groupBy(lambda x: x[0]).sortByKey().flatMap(lambda x: list(x[1])).collect() 

[[ '亞當',22,100],[ '鮑勃',25,102],[ '鮑勃',28,104],[ '克里斯',29,110]]

斯卡拉:

sc.parallelize(List(Array("Chris",29,110),Array("Bob",28,104),Array("Bob",25,102),Array("Adam",22,100))).groupBy(x => x(0).asInstanceOf[String]).sortByKey().flatMap(x=> x._2).collect() 

數組[數組[不限] =陣列(陣列(亞當,22,100),陣列(鮑勃,28,104),陣列(鮑勃,25,102),陣列(克里斯,29,110))

你可能想把其他列一個如果你想將它們包含在你的分類標準中,那麼它就是你的密鑰的一部分。所以在上面的例子中,第二列的Bob排序不會在那裏。