2015-01-06 62 views
4

我是新來的火花,並嘗試寫一些基於火花和火花流的示例代碼。如何在火花流中排序數據

到目前爲止,我已經實現了火花排序功能,這裏是代碼:

def sort(listSize: Int, slice: Int): Unit = { 
    val conf = new SparkConf().setAppName(getClass.getName) 
    val spark = new SparkContext(conf) 
    val data = genRandom(listSize) 
    val distData = spark.parallelize(data, slice) 
    val result = distData.sortBy(x => x, true) 
    val finalResult = result.collect() 
    val step5 = System.currentTimeMillis() 
    printlnArray(finalResult, 0, 10) 
    spark.stop() 
    } 

    /** 
    * generate random number 
    * @return 
    */ 
    def genRandom(listSize: Int): List[Int] = { 
    val range = 100000 
    var listBuffer = new ListBuffer[Int] 
    val random = new Random() 
    for (i <- 1 to listSize) listBuffer += random.nextInt(range) 
    listBuffer.toList 
    } 

    def printlnArray(list: Array[Int], start: Int, offset: Int) { 
    for (i <- start until start + offset) println(">>>>>>>>> list : " + i + " | " + list(i)) 
    } 

我對火花流實現某種功能的麻煩。據我所知,火花RDD提供火花核心中的排序API,但火花流中沒有這樣的API,有誰知道該怎麼做?謝謝

這是一個轉儲問題,但在網絡上谷歌後,我找不到正確的答案。如果有人知道如何解決它,謝謝你的幫助。

+0

做什麼?後者是 - 就流加工而言 - 一般來說afaik是不可能的。 – dwegener

回答

4

您可以利用DStream的轉換功能通過使用基礎RDD進行轉換。

比如你想流的每個`microbatch`排序還是要全碼流排序

myDStream.transform(rdd=>rdd.sortByKey()) 
+0

謝謝@ Hawk66!想知道我們如何才能對DStreams進行單獨操作?比如說,如果我們只想在每個微博中最頂層的條目。 'myDStream.transform(RDD => rdd.sortByKey())。(1)頂部'OR 'myDStream.transform(RDD => rdd.sortByKey())。變換(RDD => rdd.top(1)) '?沒有任何工作到目前爲止 – Dexter

+0

沒關係。得到我的答案在這個SO鏈接http://stackoverflow.com/questions/41483746/transformed-dstream-in-pyspark-gives-error-when-pprint-called-on-it – Dexter