0
我在本地運行Spark並且出現了一個奇怪的問題。基本上,我可以使用DataFrame的show()方法輸出任意數量的行,但是,當我嘗試使用count()或collect()(甚至是很少量的數據)時,Spark就會停留在該階段。永遠不會完成它的工作。我使用gradle來構建和運行。show()/ count()永遠不會完成while show()快速運行
當我運行
./gradlew clean run
程序卡住在
> Building 83% > :run
什麼會導致這個問題? 這是代碼。
val moviesRatingsDF = MongoSpark.load(sc).toDF().select("movieId", "userId","rating")
val movieRatingsDF = moviesRatingsDF
.groupBy("movieId")
.pivot("userId")
.max("rating")
.na.fill(0)
val ratingColumns = movieRatingsDF.columns.drop(1) // drop the name column
val movieRatingsDS:Dataset[MovieRatingsVector] = movieRatingsDF
.select(col("movieId").as("movie_id"), array(ratingColumns.map(x => col(x)): _*).as("ratings"))
.as[MovieRatingsVector]
val moviePairs = movieRatingsDS.withColumnRenamed("ratings", "ratings1")
.withColumnRenamed("movie_id", "movie_id1")
.crossJoin(movieRatingsDS.withColumnRenamed("ratings", "ratings2").withColumnRenamed("movie_id", "movie_id2"))
.filter(col("movie_id1") < col("movie_id2"))
val movieSimilarities = moviePairs.map(row => {
val ratings1 = sc.parallelize(row.getAs[Seq[Double]]("ratings1"))
val ratings2 = sc.parallelize(row.getAs[Seq[Double]]("ratings2"))
val corr:Double = Statistics.corr(ratings1, ratings2)
MovieSimilarity(row.getAs[Long]("movie_id1"), row.getAs[Long]("movie_id2"), corr)
}).cache()
val collectedData = movieSimilarities.collect()
println(collectedData.length)
log.warn("I'm done") //never gets here
close
我可能在這裏是錯的,但我認爲你不應該在'moviePairs'數據集的轉換中創建新的RDD? (我指的是兩個'sc.parallelize(。)') –
我也覺得它有點低效。但是Statistics.corr方法需要一對RDD。如果我有辦法將2個向量傳遞給它,會很好。但這種情況並非如此。無論如何,這個任務看起來很好(如果我之後打印一些東西 - 它不會花費很長時間)。 –