4

我做的電影recommandation系統,利用現有的MovieLens數據集在這裏: http://grouplens.org/datasets/movielens/OutOfBoundsException與ALS - 弗林克MLlib

要計算該recommandation系統,我用弗林克的ML庫Scala和particulalrly ALS算法(org.apache.flink.ml.recommendation.ALS)。

我第一次在電影的收視率映射到DataSet[(Int, Int, Double)],然後創建一個trainingSettestSet(見下面的代碼)。

我的問題是,當我在整個數據集(所有評級)中使用ALS.fit函數時沒有任何錯誤,但是如果我只刪除一個評級,那麼fit函數不再有效,而我不明白爲什麼。

你有什麼想法嗎? :)使用

代碼:

Rating.scala

case class Rating(userId: Int, movieId: Int, rating: Double) 

PreProcessing.scala

object PreProcessing { 

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = { 
     env.readCsvFile[(Int, Int, Double)](
     ratingsPath, ignoreFirstLine = true, 
     includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)} 
} 

Processing.scala

object Processing { 
    private val ratingsPath: String = "Path_to_ratings.csv" 

    def main(args: Array[String]) { 

    val env = ExecutionEnvironment.getExecutionEnvironment 

    val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath) 

    val trainingSet : DataSet[(Int, Int, Double)] = 
    ratings 
    .map(r => (r.userId, r.movieId, r.rating)) 
    .sortPartition(0, Order.ASCENDING) 
    .first(ratings.count().toInt) 

    val als = ALS() 
    .setIterations(10) 
    .setNumFactors(10) 
    .setBlocks(150) 
    .setTemporaryPath("/tmp/tmpALS") 

    val parameters = ParameterMap() 
    .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem 
    .add(ALS.Seed, 42L) 

    als.fit(trainingSet, parameters) 
    } 
} 

「但如果我只是刪除只有一個等級」

val trainingSet : DataSet[(Int, Int, Double)] = 
    ratings 
    .map(r => (r.userId, r.movieId, r.rating)) 
    .sortPartition(0, Order.ASCENDING) 
    .first((ratings.count()-1).toInt) 

錯誤:

2015年6月19日15時00分24秒協同組(協同組在org.apache.flink.ml.recommendation.ALS $ .updateFactors(ALS.scala:570))(4/4)切換爲FAILED

java.lang.ArrayIndexOutOfBoundsException:5

在org.apache.flink.ml.recommendation.ALS $ BlockRating.apply(ALS.scala:358)

在org.apache.flink.ml.recommendation。 ALS $$匿名$ 111.coGroup(ALS.scala:635)

在org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)

...

回答

7

問題是first運營商結合Flink的實施的setTemporaryPath參數。爲了理解這個問題,讓我快速解釋阻塞ALS算法的工作原理。

交替最小二乘的阻塞實現首先將給定的評分矩陣用戶智能和項目智能劃分爲塊。對於這些塊,計算路由信息。此路由信息指出哪個用戶/項目塊分別接收來自哪個項目/用戶塊的哪個輸入。之後,ALS迭代開始。由於Flink的底層執行引擎是一個並行流式數據流引擎,因此它試圖以流水線方式儘可能多地執行數據流。這就要求所有管線的操作員同時在線。這具有Flink避免實現可能過大的中間結果的優點。缺點是可用內存必須在所有運行的操作員之間共享。在個體(例如,用戶/項目塊)的大小相當大的ALS的情況下,這不是期望的。

爲了解決這個問題,如果你已經設置了temporaryPath,那麼不是所有的執行操作符都是同時執行的。該路徑定義了可以存儲中間結果的位置。因此,如果您已經定義了臨時路徑,則ALS首先計算用戶塊的路由信息​​並將它們寫入磁盤,然後計算項目塊的路由信息​​並將它們寫入磁盤,最後但不是最少ALS迭代,它從臨時路徑讀取路由信息。

用戶和項目塊的路由信息​​的計算都取決於給定的評分數據集。在您計算用戶路由信息的情況下,它將首先讀取收視率數據集並在其上應用first運營商。 first運算符返回底層數據集中的任意元素n。現在的問題是Flink不存儲這個first操作的結果來計算物品路由信息。相反,當您開始計算物品路由信息時,Flink將從其源重新開始執行數據流。這意味着它從磁盤讀取額定值數據集,並再次應用first運算符。在許多情況下,與第一個first操作的結果相比,這會給您帶來不同的評分。因此,生成的路由信息​​不一致,並且ALS失敗。

您可以通過實現運算符first的結果來避開該問題,並將此結果用作ALS算法的輸入。對象FlinkMLTools包含一個方法persist,它需要一個DataSet,將它寫入給定的路徑,然後返回一個新的DataSet,它讀取剛寫入的DataSet。這使您可以分解結果數據流圖。

val firstTrainingSet : DataSet[(Int, Int, Double)] = 
    ratings 
    .map(r => (r.userId, r.movieId, r.rating)) 
    .first((ratings.count()-1).toInt) 

val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training") 

val als = ALS() 
    .setIterations(10) 
    .setNumFactors(10) 
    .setBlocks(150) 
    .setTemporaryPath("/tmp/tmpALS/") 

val parameters = ParameterMap() 
    .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem 
    .add(ALS.Seed, 42L) 

als.fit(trainingSet, parameters) 

或者,您可以嘗試保留temporaryPath未設置。然後所有步驟(路由信息計算和等價迭代)都以流水線方式執行。這意味着用戶和項目路由信息計算都使用與運算符first相同的輸入數據集。

Flink社區目前正致力於在內存中保持運算符的中間結果。這將允許固定運算符的結果,以便它不會被計算兩次,因此由於其非確定性性質而不會給出不同的結果。

+0

謝謝!完美的作品! 只是一個問題:當你使用'persist'時,你將路徑設置爲一個文件。但是'ALS'如何知道這個持久化文件是爲了它而不是爲了程序的其他部分(例如,如果我們必須爲另一個算法「堅持」)呢? – Kerial

+0

'persist'函數調用觸發數據流的執行。因此,存儲在相同路徑下的任何文件都將被覆蓋。只有在觸發後續部分工作時纔會讀取結果。這意味着您可以在理論上刪除或覆蓋此文件。因此,您應該嘗試在算法範圍內分配唯一的文件名。或者,您也可以創建碰撞概率較低的隨機文件名。 –