2016-08-12 18 views
2

我正在嘗試使用文件DStream中的大型RDD進行讀取。在大型Dstream中計算我的RDD的記錄

的代碼如下:

val creatingFunc = {() => 
    val conf = new SparkConf() 
       .setMaster("local[10]") 
       .setAppName("FileStreaming") 
       .set("spark.streaming.fileStream.minRememberDuration", "2000000h") 
       .registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable], 
classOf[org.apache.hadoop.io.Text], classOf[GGSN])) 

    val sc = new SparkContext(conf) 

    // Create a StreamingContext 
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds)) 

    val appFile = httpFileLines 
        .map(x=> (x._1,x._2.toString())) 
        .filter(!_._2.contains("ggsnIPAddress")) 
        .map(x=>(x._1,x._2.split(","))) 

    var count=0 

    appFile.foreachRDD(s => { 
    // s.collect() throw exception due to insufficient amount of emery 
    //s.count() throw exception due to insufficient amount of memory 
    s.foreach(x => count = count + 1) 
    }) 

    println(count) 
    newContextCreated = true 
    ssc 
} 

什麼,我要做的是讓我的RDD..however的數量,因爲它是large..it拋出exception..so我需要做的一個foreach來避免數據收集到內存..

我想要得到的計數,然後在我的代碼的方式,但它總是給0 ..

有沒有辦法做到這一點?

+0

在處理RDD的時候,你不能像這樣在一個局部變量中累加和。您需要使用'org.apache.spark.Accumulator'或者您可以調用'Rdd.count'或'DStream.count' –

+0

您的httpFileLines被創建在哪裏?它是'RDD還是'DStream'? –

+0

你想要計算你的rdds數量還是dstream中所有元素的數量? – Knight71

回答

0

有沒有必要foreachRDD並致電count。您可以使用DStream定義的count方法:

val appFile = httpFileLines 
       .map(x => (x._1, x._2.toString())) 
       .filter(!_._2.contains("ggsnIPAddress")) 
       .map(x => (x._1, x._2.split(","))) 

val count = appFile.count() 

如果仍然得到內存異常量不足,您可能需要每次都計算數據的小批量,或放大你工作節點處理負載。

+0

這不會返回DStream中的所有元素的計數,我仍然需要做foreach .. – Luckylukee