2017-07-09 44 views
1

我有一個非常大的文件。對於文件每一行的每個雙字(雙字),我必須檢查整個文件。我在Scala中所做的顯然是錯誤的,但我不知道如何解決它。迭代一個巨大的列表導致超過gc開銷限制

這個函數返回該文件的所有行

def allSentences() : ArrayList[String] = { 
     val res: ArrayList[String] = new ArrayList[String]() 
     val filename = "/path/test.txt" 
     val fstream: FileInputStream = new FileInputStream(filename) 
     val br: BufferedReader = new BufferedReader(new InputStreamReader(fstream)) 
     var strLine: String = null 
     while ({strLine = br.readLine(); strLine!= null}) 
     res.add(strLine) 
     br.close() 
     return res 
    } 

這是我如何使用它(大約3個百萬!):

val p = sc.textFile("file:///path/test.txt") 
val result11 = p 
      .flatMap(line => biTuple(line)) 
      .map(word => (word, 1)) 
      .reduceByKey(_ + _) 

      val result10 = result11 
      .flatMap { tuple => allSentences().map(tuple._1 -> _) } 
      .map(tuple => (tuple._1, count10(tuple._1,tuple._2))) 
      .reduceByKey(_ + _) 

我幾乎可以肯定的問題是在這裏.flatMap { tuple => allSentences().map(tuple._1 -> _) }但有沒有其他方法可以做到這一點!?

P.S:biTuple()返回該行的所有行列式中的ArrayListcount10()返回1,如果第一個雙字符串存在,但第二個不存在。 result11是所有bugrams及計數等的RDD( 「字詞1字詞2」,計數)

這是錯誤輸出:

java.lang.OutOfMemoryError: GC overhead limit exceeded 
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201) 
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198) 
     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
     at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
     at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
     at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152) 
     at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58) 
     at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83) 
     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

請注意,我有SPARK_WORKER_MEMORY=90GSPARK_DRIVER_MEMORY=90G

+0

你可以添加輸出錯誤嗎?我們需要知道GC是在工作人員還是在司機中。 –

+0

謝謝@ThiagoBaldim我剛剛添加了輸出錯誤。 –

回答

2

它看起來像你正在嘗試做的是result11p笛卡爾積(你原來的句子的列表),但你通過打開和讀取整個文件到內存中以便在result11每個條目做。這必然會強調垃圾收集器,儘管我不能肯定地說這是GC問題的原因。 Spark在RDD上有一個cartesian方法,如果我對你想要做的解釋是正確的,它可能會更好。 (然而,它會在網絡上進行大量的數據複製。)

您也可以調查count10邏輯是否應該用於過濾操作,減少最終需要處理的條目數reduceByKey

相關問題