我有一個非常大的文件。對於文件每一行的每個雙字(雙字),我必須檢查整個文件。我在Scala中所做的顯然是錯誤的,但我不知道如何解決它。迭代一個巨大的列表導致超過gc開銷限制
這個函數返回該文件的所有行
def allSentences() : ArrayList[String] = {
val res: ArrayList[String] = new ArrayList[String]()
val filename = "/path/test.txt"
val fstream: FileInputStream = new FileInputStream(filename)
val br: BufferedReader = new BufferedReader(new InputStreamReader(fstream))
var strLine: String = null
while ({strLine = br.readLine(); strLine!= null})
res.add(strLine)
br.close()
return res
}
這是我如何使用它(大約3個百萬!):
val p = sc.textFile("file:///path/test.txt")
val result11 = p
.flatMap(line => biTuple(line))
.map(word => (word, 1))
.reduceByKey(_ + _)
val result10 = result11
.flatMap { tuple => allSentences().map(tuple._1 -> _) }
.map(tuple => (tuple._1, count10(tuple._1,tuple._2)))
.reduceByKey(_ + _)
我幾乎可以肯定的問題是在這裏.flatMap { tuple => allSentences().map(tuple._1 -> _) }
但有沒有其他方法可以做到這一點!?
P.S:biTuple()
返回該行的所有行列式中的ArrayList
。 count10()
返回1,如果第一個雙字符串存在,但第二個不存在。 result11
是所有bugrams及計數等的RDD( 「字詞1字詞2」,計數)
這是錯誤輸出:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
請注意,我有SPARK_WORKER_MEMORY=90G
和SPARK_DRIVER_MEMORY=90G
。
你可以添加輸出錯誤嗎?我們需要知道GC是在工作人員還是在司機中。 –
謝謝@ThiagoBaldim我剛剛添加了輸出錯誤。 –