2017-10-20 61 views
0

我在本地運行Spark並且出現了一個奇怪的問題。基本上,我可以使用DataFrame的show()方法輸出任意數量的行,但是,當我嘗試使用count()或collect()(甚至是很少量的數據)時,Spark就會停留在該階段。永遠不會完成它的工作。我使用gradle來構建和運行。show()/ count()永遠不會完成while show()快速運行

當我運行

./gradlew clean run 

程序卡住在

> Building 83% > :run 

什麼會導致這個問題? 這是代碼。

val moviesRatingsDF = MongoSpark.load(sc).toDF().select("movieId", "userId","rating") 

    val movieRatingsDF = moviesRatingsDF 
     .groupBy("movieId") 
     .pivot("userId") 
     .max("rating") 
     .na.fill(0) 

    val ratingColumns = movieRatingsDF.columns.drop(1) // drop the name column 

    val movieRatingsDS:Dataset[MovieRatingsVector] = movieRatingsDF 
     .select(col("movieId").as("movie_id"), array(ratingColumns.map(x => col(x)): _*).as("ratings")) 
     .as[MovieRatingsVector] 

    val moviePairs = movieRatingsDS.withColumnRenamed("ratings", "ratings1") 
     .withColumnRenamed("movie_id", "movie_id1") 
     .crossJoin(movieRatingsDS.withColumnRenamed("ratings", "ratings2").withColumnRenamed("movie_id", "movie_id2")) 
     .filter(col("movie_id1") < col("movie_id2")) 

    val movieSimilarities = moviePairs.map(row => { 
     val ratings1 = sc.parallelize(row.getAs[Seq[Double]]("ratings1")) 
     val ratings2 = sc.parallelize(row.getAs[Seq[Double]]("ratings2")) 
     val corr:Double = Statistics.corr(ratings1, ratings2) 

     MovieSimilarity(row.getAs[Long]("movie_id1"), row.getAs[Long]("movie_id2"), corr) 
    }).cache() 


    val collectedData = movieSimilarities.collect() 
    println(collectedData.length) 

    log.warn("I'm done") //never gets here 

    close 

enter image description here

+0

我可能在這裏是錯的,但我認爲你不應該在'moviePairs'數據集的轉換中創建新的RDD? (我指的是兩個'sc.parallelize(。)') –

+0

我也覺得它有點低效。但是Statistics.corr方法需要一對RDD。如果我有辦法將2個向量傳遞給它,會很好。但這種情況並非如此。無論如何,這個任務看起來很好(如果我之後打印一些東西 - 它不會花費很長時間)。 –

回答

-1

星火確實懶惰的評價,並創建RDD/DF的時候一個動作被調用。

要回答你的問題

1。在收集/計數你調用兩個不同的動作,櫃面如果你不 堅持數據,這將導致RDD/DF進行重新評估,因此 比預期更多的時間。

  1. 在顯示中只有一個動作。它只顯示前1000行(手指交叉 ),因此它結束