2015-11-07 123 views
1

我的任務是編寫一個讀取大文件(不適合內存)的代碼將其逆轉並輸出最常用的五個單詞。
我已經寫下了下面的代碼,它完成了這項工作。Spark代碼優化

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object ReverseFile { 
    def main(args: Array[String]) { 


    val conf = new SparkConf().setAppName("Reverse File") 
    conf.set("spark.hadoop.validateOutputSpecs", "false") 
    val sc = new SparkContext(conf) 
    val txtFile = "path/README_mid.md" 
    val txtData = sc.textFile(txtFile) 
    txtData.cache() 

    val tmp = txtData.map(l => l.reverse).zipWithIndex().map{ case(x,y) => (y,x)}.sortByKey(ascending = false).map{ case(u,v) => v} 

    tmp.coalesce(1,true).saveAsTextFile("path/out.md") 

    val txtOut = "path/out.md" 
    val txtOutData = sc.textFile(txtOut) 
    txtOutData.cache() 

    val wcData = txtOutData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(ascending = false) 
    wcData.collect().take(5).foreach(println) 


    } 
} 

的問題是,我是新來的火花和Scala,並且你可以在代碼中看到我第一次讀取文件扭轉它保存然後讀取它扭轉和輸出的五個最頻繁出現的詞彙。

  • 有沒有辦法告訴火花保存TMP和工藝wcData(無需保存,打開文件)在同一時間,否則它像讀取文件的兩倍。
  • 從現在開始,我要解決很多火花,所以如果有代碼的任何部分(不像絕對路徑名......特定的火花),你可能認爲可以寫得更好i'de欣賞它。

回答

2
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object ReverseFile { 
    def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("Reverse File") 
    conf.set("spark.hadoop.validateOutputSpecs", "false") 
    val sc = new SparkContext(conf) 
    val txtFile = "path/README_mid.md" 
    val txtData = sc.textFile(txtFile) 
    txtData.cache() 

    val reversed = txtData 
     .zipWithIndex() 
     .map(_.swap) 
     .sortByKey(ascending = false) 
     .map(_._2) // No need to deconstruct the tuple. 

    // No need for the coalesce, spark should do that by itself. 
    reversed.saveAsTextFile("path/reversed.md") 

    // Reuse txtData here. 
    val wcData = txtData 
     .flatMap(_.split(" ")) 
     .map(word => (word, 1)) 
     .reduceByKey(_ + _) 
     .map(_.swap) 
     .sortByKey(ascending = false) 

    wcData 
     .take(5) // Take already collects. 
     .foreach(println) 
    } 
} 

始終執行collect()最後,使星火可以評估集羣上的東西。

+0

拿(5)收集,爲什麼? – eliasah

+0

謝謝你的答案,但幾個筆記:1 - 沒有合併火花將文件保存爲分區我想保存爲一個,2-採取(5)之前collect()給編譯錯誤(沒有足夠的參數收集),3 - 你能否給我提供一個關於所有這個地圖的教程的鏈接(_ + _/_._ 2)不需要的東西。非常感謝 – Epsilon

+1

1 - 好的。根據需要更改。 2 - 修正了收集。 3 - https://stackoverflow.com/questions/8000903/what-are-all-the-uses-of-an-underscore-in-scala#8000934 – Reactormonk

2

代碼中最昂貴的部分是排序,因此明顯的改進就是將其刪除。在完全過時的第二種情況下,這是相對簡單的:

val wcData = txtData 
    .flatMap(_.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) // No need to swap or sort 

// Use top method and explicit ordering in place of swap/sortByKey 
val wcData = top(5)(scala.math.Ordering.by[(String, Int), Int](_._2)) 

行的換向順序有點棘手。首先讓每個分區重新排序元素:

val reversedPartitions = txtData.mapPartitions(_.toList.reverse.toIterator) 

現在你有兩個選擇

  • 使用自定義分區

    class ReversePartitioner(n: Int) extends Partitioner { 
        def numPartitions: Int = n 
        def getPartition(key: Any): Int = { 
        val k = key.asInstanceOf[Int] 
        return numPartitions - 1 - k 
        } 
    } 
    
    val partitioner = new ReversePartitioner(reversedPartitions.partitions.size) 
    
    val reversed = reversedPartitions 
        // Add current partition number 
        .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toList))) 
        // Repartition to get reversed order 
        .partitionBy(partitioner) 
        // Drop partition numbers 
        .values 
        // Reshape 
        .flatMap(identity) 
    

    它仍然需要洗牌,但它是相對便攜,數據仍然是可以訪問在記憶中。

  • 如果您只想保存反轉的數據,您可以撥打saveAsTextFile並在reversedPartitions上進行邏輯重新排序。由於part-n名稱格式標識源分區,所以您只需將part-n重命名爲part-(number-of-partitions - 1 -n)即可。它需要保存數據,因此它不是最優的,但如果你使用內存中的文件系統可以是一個很好的解決方案。

+0

整潔。我正在考慮Partitioner的魔法,但不知道如何實現它。 – Reactormonk