2

我已經約70百萬行的數據集的用戶的位置和日期時間CSV,並寫下了下面的代碼,以平均最高100個用戶的點數:星火需要0.5秒平均100號

val spark = org.apache.spark.sql.SparkSession.builder 
    .appName("Test") 
    .getOrCreate 

import spark.implicits._ 

val watch = new Stopwatch() 
watch.start() 
val schema = new StructType().add("user_id", StringType).add("datetime", LongType) 

val df = spark.read.format("csv").option("header", "true").schema(schema).csv(inputFile) 
df.createOrReplaceTempView("paths") 

val pathDs = spark.sql("select user_id, min(datetime) as started, max(datetime) as finished, " + 
    "count(*) as total, max(datetime) - min(datetime) as timeDelta " + 
    "from paths group by user_id order by total desc limit 100") 

pathDs.cache() 
pathDs.collect.foreach(println) 
println(watch.elapsedTime(TimeUnit.MILLISECONDS)) 
val avgPoints = pathDs.select(avg("total")).as[Double].head() 
println(avgPoints) 
println(watch.stop()) 

這裏發生的事情是我花費了數百萬/億的記錄(最終可能會花費數TB),並將它們彙總爲100列5列的記錄。問題不在於這部分需要多長時間,或者我們如何加快速度,而在於我們處理最終的100條記錄時會發生什麼。

還有一種更簡單的方法可以直接通過SQL完成此操作,但我還需要pathDS以便稍後進行更多處理。該代碼工作正常,但我注意到pathDs.select(avg("total")).as[Double].head()開始做了大量的工作,並最終花費了大約半秒,即使pathDS只包含100行。

你知道爲什麼它需要這麼長時間,我怎麼能加快這個速度,特別是在這個只有100行的小數據集上運行?我專門做了.cache和.collect,以便在進行任何進一步聚合之前在本地創建所有100條記錄(而且我現在正在本地運行此操作)。

我在本地使用Scala 2.11上的Spark 2.2。

+0

我在這裏沒有處理小數據 - 完整的數據集將會有數百GB/TB。但是我特別想看看爲什麼這個特定的例子很慢,因爲我認爲做.cache()應該讓我的100行數據集保留在內存中,並且它的結果應該立即出現。 – kozyr

+0

我建議檢查一下SparkUI中的一些高級指標,找出爲什麼它需要這麼長時間。我還建議通過此代碼運行至少一個數據集,例如1000個值,這樣您可以更好地推斷常量開銷與實際(O(n))的計算時間。 –

回答

3

Spark針對大數據集進行優化。這意味着通常會有一些開銷對於大數據集可以忽略不計,但對於小數據集則不會忽略不計。

考慮當您運行計算avgPoints會發生什麼:

  1. 星火計算「轉型」,即它定義需要什麼樣的計算做(這是選擇的部分,平均等)。
  2. 你稱之爲「頭部」動作,它使得火花把你製作的表達樹變成一個物理計劃。這包括優化以及比較幾種可能的解決方案。請注意,該表達式還包含計算緩存部分的表達式。在實踐中,這些步驟將被跳過(你可以在Spark UI中看到這個),但是它們仍然被認爲是在某些情況下可能會決定重新計算一些緩存數據(在這種情況下幾乎肯定不會)。
  3. Spark使用整個階段代碼生成將物理計劃編譯成代碼,序列化此代碼並將其發送給所有相關的執行者。
  4. 當spark創建計劃時,它將數據分區(可能是200個分區,因爲這是groupby的默認分區)。這意味着你在執行者之間分配了200個任務。大多數分區將有0或1個元素,因此他們所做的任務幾乎立即執行,但火花必須啓動200個任務。
  5. Spark將200個任務中的每個任務的結果發送到緩衝區,並將它們全部發送給單個執行程序以完成最終聚合。在所有任務完成併發送其數據之前,最終的聚合任務不會開始。
  6. 一旦最終的聚合完成,結果將被髮送回驅動程序。

正如你可以看到這裏有很多階段,包括網絡傳輸和開始/結束任務(需要管理)。即使沒有真實數據,這裏的總體開銷也很容易達到半秒。

如果將限制更改爲1000,即使處理10倍數據,也可能看到總體時間變化很小。

這是一個常見的使用情況,使用火花來減少問題的大小,即你有大量的數據,做一些聚合,並獲得更少的元素(在你的情況下爲100),然後你會收集它們到驅動程序,並直接採取行動,而不是使用火花,以避免開銷(例如,在你的情況下保存收集的結果,而不是用println做foreach,總結一下)。

你可以做的一件事情是在計算pathDs時使用coalesce(1)。這意味着你只有一個分區(所有的連接都將成爲第一階段的一部分)。這與使用收集結果並沒有太大區別,只是如果要將限制更改爲較大的大小,那麼合併爲較小但不是1的值可能很有用(例如,您可以設置10000的限制,然後將其合併爲4仍然有一些平行)。

更新

基礎上的評論限制的結果是當前1分區,以便凝聚不會幫助(這也意味着沒有真正的理由不這樣做,除非收集您要使用數據幀功能結果)。上述過程仍然正確,只是使用了一個分區而不是多個分區。

+0

我建議刪除最後一段。這完全沒用,特別是當你考慮時,'pathDs'將總是有1個分區。 – zero323

+0

@ zero323是否始終如此?分區數量不會取決於實際的限制數量和每個分區的密鑰數量? –

+0

所以,在我的代碼,我專門做了 pathDs.cache(的println) 計算平均值,以爲這實際上將拉低結果到驅動程序之前 pathDs.collect.foreach - 但它似乎並不像這就是發生了什麼事。什麼是收集這裏的正確方法? 請注意,如果我收集並操作收集的數據集,結果會更快,因爲我只是在本地Scala對象上工作,但如果我收集,打印,然後在pathDS上執行其他操作(例如在我的代碼中),它仍然是0.5秒。有沒有更好的方法來做緩存/收集? – kozyr

0

優化它的一種方法是使用函數collect將整個數據集放入內存中,然後使用規範的Scala操作,您可以在1-2 ms內完成該操作?但這首先反駁了使用Spark的原因。

Spark的優勢在於在不同機器上的多個節點間高效地執行分佈式計算。小數據集上的操作在不通過Spark傳遞的情況下總是更高效。你試驗它的時機相當於747飛行100米。現在你想知道爲什麼747這麼慢,當每個人都說飛行讓你如此之快的時候。

在使用RDDs在Spark中執行工作的舊方式中,在版本1.2 - > 1.6左右,可以使用像mapPartitionsWithIndex這樣的函數對分區數據執行正常的scala操作,以避免火花消耗。這當然意味着在這個函數中,所有的數據都已經在火花節點級別被隔離了。使用這種方法,您將獲得兩個世界的好處。

+0

事情是,我運行收集我的數據集。我也運行.cache() – kozyr

+0

另外,我沒有在小數據集上運行 - 這將在TB中進行生產,如果在這裏計算平均值爲100的數字在某種程度上取決於我的初始數據集,那麼這將花費比0.5秒長很多,這會很麻煩。即假設這是我的使用案例 - 使用TB的GPS點,獲得前100條路徑,並計算出一些平均值。 – kozyr

-2
  1. 測量不cache你丟掉時加載數據時到cache本身。可能會更快。
  2. 您能否將input data轉換爲parquet並將其加載到相同羣集中的存儲器存儲器中,例如alluxio?如果是,partition它由user_id。理想情況下,設計體系結構使新的輸入數據被推送到kafka,一個structured streaming作業將其附加到alluxiocassandra,另一個在所選範圍內進行聚合。另外,請使用flink或者batchstream,因爲它通常更快。

,如果你不能控制輸入數據結構,然後重點給予2DN階段,並嘗試使用typed aggregates如:

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, 
    T] 

,你將需要創建數據csv設置爲:

case class Input(userId: String, time: DateTime) 

val ds = spark.read.format("csv").option("header", 
"true").schema(schema).csv(inputFile).as[Input] 

ds.groupByKey(_.userId).avg(_.time).show 

由於鍵入的性能好處,對於大數據集肯定會更快,但對於較小的數據集可能不會更快

+0

問題是,這裏有兩個階段 - 一個加載大量數據(不關心需要多長時間),然後將它彙總到100條記錄中。然後第二階段計算這100條記錄中單個列的平均值,這需要0.5秒,這非常緩慢。 – kozyr

+0

我不認爲這會給我帶來任何性能優勢,或者真正解釋我所看到的情況,它只是我遇到問題的小數據集。 – kozyr

+0

對於大型數據集,由於類型化的性能優勢,肯定會更快__它肯定不會:) – zero323