2016-10-21 79 views
0

我有包含像這樣如何用scala/spark來計算元素的出現次數?

00|905000|20160125204123|79644809999||HGMTC|1||22|7905000|56321647569|||34110|I||||||250995210056537|354805064211510||56191|||38704||A|||11|V|81079681404134|5||||SE|||G|144|||||||||||||||Y|b00534589.huawei_anadyr.20151231184912||1|||||79681404134|0|||[email protected]@+1{79098509982}2{2}3{2}5{79644809999}6{0000002A7A5AC635}7{79681404134}|20160125| 

元素的列表通過一系列的步驟的文件,我設法將其轉換爲元素的列表,這樣

(902996760100000,CompactBuffer(6, 5, 2, 2, 8, 6, 5, 3)) 

凡和是鍵和6,5,2,2,8,6,5,3是值。值可以是1到8之間的數字。有什麼方法可以用spark來統計這些值的出現次數,所以結果如下所示?

(902996760100000, 0_1, 2_2, 1_3, 0_4, 2_5, 2_6, 0_7, 1_8) 

我可以的if else塊和人員,但不會是漂亮做到這一點,所以我想知道,如果有我能在斯卡拉/火花使用任何instrumets。

這是我的代碼。

class ScalaJob(sc: SparkContext) { 
    def run(cdrPath: String) : RDD[(String, Iterable[String])] = { 
    //pass the file 
    val fileCdr = sc.textFile(cdrPath); 

    //find values in every raw cdr 
    val valuesCdr = fileCdr.map{ 
     dataRaw => 
     val p = dataRaw.split("[|]",-1) 
     (p(1), ScalaJob.processType(ScalaJob.processTime(p(2)) + "_" + p(32))) 
    } 
    val x = valuesCdr.groupByKey() 
    return x 
    } 

有關優化它的任何意見,將不勝感激。我對scala/spark非常陌生。

+0

只是一個忠告,擺脫那些不相關的問題的範圍,代碼和信息,它使你的問題的可讀性。 – cheseaux

+0

您可以查看規範[字數統計示例](http://spark.apache.org/examples.html)。 – erip

回答

1

首先,Scala是一個類型安全的語言,Spark的RDD API也是如此 - 所以強烈建議使用類型系統,而不是通過將所有內容「編碼」爲字符串來繞過它。

所以我會建議一個解決方案,創建一個RDD[(String, Seq[(Int, Int)])](元組中的第二項是一個(ID,count)元組序列),而不是RDD[(String, Iterable[String])],這似乎不太有用。

下面是計數的1的出現到8一個簡單的函數在給定Iterable[Int]

def countValues(l: Iterable[Int]): Seq[(Int, Int)] = { 
    (1 to 8).map(i => (i, l.count(_ == i))) 
} 

您可以使用mapValues使用此功能(放置功能在串行化的對象,像你這樣的做其餘部分)上的RDD[(String, Iterable[Int])]得到結果:

valuesCdr.groupByKey().mapValues(ScalaJob.countValues) 

將全部溶液然後可以簡化一些:

class ScalaJob(sc: SparkContext) { 
    import ScalaJob._ 

    def run(cdrPath: String): RDD[(String, Seq[(Int, Int)])] = { 
    val valuesCdr = sc.textFile(cdrPath) 
     .map(_.split("\\|")) 
     .map(p => (p(1), processType(processTime(p(2)), p(32)))) 

    valuesCdr.groupByKey().mapValues(countValues) 
    } 
} 

object ScalaJob { 
    val dayParts = Map((6 to 11) -> 1, (12 to 18) -> 2, (19 to 23) -> 3, (0 to 5) -> 4) 

    def processTime(s: String): Int = { 
    val hour = DateTime.parse(s, DateTimeFormat.forPattern("yyyyMMddHHmmss")).getHourOfDay 
    dayParts.filterKeys(_.contains(hour)).values.head 
    } 

    def processType(dayPart: Int, s: String): Int = s match { 
    case "S" => 2 * dayPart - 1 
    case "V" => 2 * dayPart 
    } 

    def countValues(l: Iterable[Int]): Seq[(Int, Int)] = { 
    (1 to 8).map(i => (i, l.count(_ == i))) 
    } 
} 
相關問題