2
我正在嘗試使用文件DStream
中的大型RDD進行讀取。在大型Dstream中計算我的RDD的記錄
的代碼如下:
val creatingFunc = {() =>
val conf = new SparkConf()
.setMaster("local[10]")
.setAppName("FileStreaming")
.set("spark.streaming.fileStream.minRememberDuration", "2000000h")
.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text], classOf[GGSN]))
val sc = new SparkContext(conf)
// Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val appFile = httpFileLines
.map(x=> (x._1,x._2.toString()))
.filter(!_._2.contains("ggsnIPAddress"))
.map(x=>(x._1,x._2.split(",")))
var count=0
appFile.foreachRDD(s => {
// s.collect() throw exception due to insufficient amount of emery
//s.count() throw exception due to insufficient amount of memory
s.foreach(x => count = count + 1)
})
println(count)
newContextCreated = true
ssc
}
什麼,我要做的是讓我的RDD..however的數量,因爲它是large..it拋出exception..so我需要做的一個foreach來避免數據收集到內存..
我想要得到的計數,然後在我的代碼的方式,但它總是給0 ..
有沒有辦法做到這一點?
在處理RDD的時候,你不能像這樣在一個局部變量中累加和。您需要使用'org.apache.spark.Accumulator'或者您可以調用'Rdd.count'或'DStream.count' –
您的httpFileLines被創建在哪裏?它是'RDD還是'DStream'? –
你想要計算你的rdds數量還是dstream中所有元素的數量? – Knight71