2013-08-26 15 views
0

我在下面的代碼中維護一個大的List:我在這裏做的是遍歷數據流並創建一個倒排索引。我使用Twitter燙傷API和dataTypePipe是TypedPipe什麼是Scala-Scalding中求和列表的替換

lazy val cats = dataTypePipe.cross(cmsCats) 
    .map(vf => (vf._1.itemId, vf._1.leafCats, vf._2)) 
    .flatMap { 
    case (id, categorySet, cHhitters) => categorySet.map(cat => (
    ... 
    } 
    .filter(f => f._2.nonEmpty) 
    .group.withReducers(4000) 
    .sum 
    .map { 
    case ((token,bucket), ids) => 
     toIndexedRecord(ids, token, bucket) 
    } 

的類型由於序列化問題,我轉換斯卡拉列表,Java列表,並使用Avro公司來寫:

def toIndexedRecord(ids: List[Long], token: String, bucket: Int): IndexRecord = { 
    val javaList = ids.map(l => l: java.lang.Long).asJava //need to convert from scala long to java long 
    new IndexRecord(token, bucket,javaList) 
    } 

但問題是大量的保存在列表中的信息會導致Java堆問題。我認爲總結也是這個問題的貢獻者

2013-08-25 16:41:09,709 WARN org.apache.hadoop.mapred.Child: Error running child 
cascading.pipe.OperatorException: [_pipe_0*_pipe_1][com.twitter.scalding.GroupBuilder$$anonfun$1.apply(GroupBuilder.scala:189)] operator Every failed executing operation: MRMAggregator[decl:'value'] 
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136) 
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39) 
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49) 
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28) 
    at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90) 
    at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133) 
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:522) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 
Caused by: java.lang.OutOfMemoryError: Java heap space 
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) 
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) 
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) 
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176) 
    at scala.collection.immutable.List.$colon$colon$colon(List.scala:127) 
    at scala.collection.immutable.List.$plus$plus(List.scala:193) 
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:86) 
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:84) 
    at com.twitter.scalding.KeyedList$$anonfun$sum$1.apply(TypedPipe.scala:264) 
    at com.twitter.scalding.MRMAggregator.aggregate(Operations.scala:279) 
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128) 

所以我的問題是我能做些什麼來避免這種情況。

回答

3

中的.sum之前嘗試.forceToReducers。這個OOM正在發生地圖側,因爲我們正在緩存值。這可能無助於你的情況。

但是,如果列表真的太大,真的很少能做到。

1

快,但不可擴展的答案:嘗試增加mapred.child.java.opts

更好的答案,那麼這是一個有點棘手,瞭解問題,因爲我不知道你的類型丘壑,我不知道是什麼fvf因爲你沒有給他們提供信息。如果您提供所需的最少量代碼,以便我可以粘貼到IDE中並進行遊戲,那麼我可能會發現您的問題。

sumsum可能是OOM發生的地方,但它不是造成它的原因 - 重構以不同的方式完成並不會有幫助。

機會是你穿越太大而不適合記憶的東西。所以mapred.child.java.opts可能是唯一的解決方案,除非你完全重組你的數據。注意cross電話crossWithTiny,現在微小意味着微小 :)

相關問題