我已經約70百萬行的數據集的用戶的位置和日期時間CSV,並寫下了下面的代碼,以平均最高100個用戶的點數:星火需要0.5秒平均100號
val spark = org.apache.spark.sql.SparkSession.builder
.appName("Test")
.getOrCreate
import spark.implicits._
val watch = new Stopwatch()
watch.start()
val schema = new StructType().add("user_id", StringType).add("datetime", LongType)
val df = spark.read.format("csv").option("header", "true").schema(schema).csv(inputFile)
df.createOrReplaceTempView("paths")
val pathDs = spark.sql("select user_id, min(datetime) as started, max(datetime) as finished, " +
"count(*) as total, max(datetime) - min(datetime) as timeDelta " +
"from paths group by user_id order by total desc limit 100")
pathDs.cache()
pathDs.collect.foreach(println)
println(watch.elapsedTime(TimeUnit.MILLISECONDS))
val avgPoints = pathDs.select(avg("total")).as[Double].head()
println(avgPoints)
println(watch.stop())
這裏發生的事情是我花費了數百萬/億的記錄(最終可能會花費數TB),並將它們彙總爲100列5列的記錄。問題不在於這部分需要多長時間,或者我們如何加快速度,而在於我們處理最終的100條記錄時會發生什麼。
還有一種更簡單的方法可以直接通過SQL完成此操作,但我還需要pathDS以便稍後進行更多處理。該代碼工作正常,但我注意到pathDs.select(avg("total")).as[Double].head()
開始做了大量的工作,並最終花費了大約半秒,即使pathDS只包含100行。
你知道爲什麼它需要這麼長時間,我怎麼能加快這個速度,特別是在這個只有100行的小數據集上運行?我專門做了.cache和.collect,以便在進行任何進一步聚合之前在本地創建所有100條記錄(而且我現在正在本地運行此操作)。
我在本地使用Scala 2.11上的Spark 2.2。
我在這裏沒有處理小數據 - 完整的數據集將會有數百GB/TB。但是我特別想看看爲什麼這個特定的例子很慢,因爲我認爲做.cache()應該讓我的100行數據集保留在內存中,並且它的結果應該立即出現。 – kozyr
我建議檢查一下SparkUI中的一些高級指標,找出爲什麼它需要這麼長時間。我還建議通過此代碼運行至少一個數據集,例如1000個值,這樣您可以更好地推斷常量開銷與實際(O(n))的計算時間。 –