1

尋找專業知識在下面指導我解決問題。當應用pyspark ALS的「recommendProductsForUsers」(儘管> 300GB Ram集羣可用)時,StackOverflow錯誤

背景:

  • 我試圖讓與靈感的this example
  • 作爲部署的基礎設施我使用谷歌雲Dataproc集羣基本PySpark腳本去。
  • 基石在我的代碼是功能「recommendProductsForUsers」記載here這使我回頂的X產品,爲所有用戶在模型

問題,我承擔

  • 的ALS。培訓腳本運行平穩,並在GCP上很好地擴展(輕鬆> 100萬用戶)。

  • 然而,應用預測:即使用函數'PredictAll'或'recommendationProductsForUsers',根本不縮放。我的腳本平滑運行一個小數據集(< 100 Customer with < 100 products)。但是,它帶來的業務相關的大小的時候,我不管理它的規模(例如> 50K客戶和> 10K產品)

  • 錯誤,那麼我得到的是如下:

    16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager: 
        Lost task 22.0 in stage 411.0 (TID 15139, 
        productrecommendation-high-w-2.c.main-nova-558.internal): 
        java.lang.StackOverflowError 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
         at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
         at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) 
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
         at java.lang.reflect.Method.invoke(Method.java:498) 
         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
         at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
    
  • 我甚至爲獲得一個300 GB的羣集(1個108GB的主節點+2個108 GB RAM的節點)來嘗試運行它;它的工作原理爲50K的客戶,但沒有什麼更多的

  • 野心是有一個設置在那裏我可以爲> 80萬個客戶

運行詳細

代碼行失敗

predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2]))) 
pprint.pprint(predictions.take(10)) 
schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)]) 
dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates() 

你如何建議繼續?我覺得腳本結尾處的「合併」部分(即,當我將其寫入dfToSave時)會導致錯誤;有沒有辦法繞開這個零件保存?

回答

1

從堆棧跟蹤Spark gives a StackOverflowError when training using ALS

基本上,星火表示RDD血統遞歸讓你深深嵌套對象結束時,事情還沒有偷懶過評估的過程中,這似乎是同樣的問題,迭代工作量。調用sc.setCheckpointDir並調整檢查點間隔將減少此RDD沿襲的長度。

+0

嗨丹尼斯,謝謝你的想法。我確實看到了其他線程,我同意ALS.train確實有一個可以自定義的檢查點間隔參數。然而,預測所有或者推薦的產品用戶功能都有這個參數;然後檢查點如何工作? –

+0

更新:已實施檢查點(感謝dennis提示)。儘管它可以很好地擴展ALS.train功能(容易超過1百萬的用戶),但它不適用於預測:i。e。使用PredictAll的功能或推薦產品爲用戶。對此有何建議? –

+0

在應用檢查點之後,當拋出異常或發生異常時,是否還會看到涉及ObjectInputStream的相同堆棧跟蹤? –