我使用Spark來處理某些語料庫,並且需要計算每個2-gram的發生次數。我開始計算元組(wordID1, wordID2)
,它工作正常,除了由於大量的小元組對象造成的大內存使用和gc開銷。然後我試圖將一對Int
放入Long
,gc開銷確實減少了很多,但運行時間也增加了幾次。Spark:flatMap/reduceByKey在某些發行版上似乎很慢,使用Long鍵
我用不同分佈的隨機數據進行了一些小實驗。性能問題似乎只發生在指數分佈式數據上。
// lines of word IDs
val data = (1 to 5000).par.map({ _ =>
(1 to 1000) map { _ => (-1000 * Math.log(Random.nextDouble)).toInt }
}).seq
// count Tuples, fast
sc parallelize(data) flatMap { line =>
val first = line.iterator
val second = line.iterator.drop(1)
for (pair <- first zip(second))
yield (pair, 1L)
} reduceByKey { _ + _ } count()
// count Long, slow
sc parallelize(data) flatMap { line =>
val first = line.iterator
val second = line.iterator.drop(1)
for ((a, b) <- first zip(second))
yield ((a.toLong << 32) | b, 1L)
} reduceByKey { _ + _ } count()
的工作分爲兩個階段,flatMap()
和count()
。當計數Tuple2
秒,flatMap()
需要約6秒和count()
需要約2秒,而當計數Long
秒,flatMap()
需要18秒和count()
需要10秒。
作爲Long
,我沒有任何意義,因爲它應該比Tuple2
開銷少。對於Long
密鑰,spark是否具有一些特殊功能?對於某些特定的分配,這種情況會發生得更糟?
那麼......除了性能,你忘記了'長'沒有實際無符號,你也有你的密鑰負數。這個「符號」位會擾亂你的「左移+按位或可預測性」。因此,在這種情況下,「長」的方法會給你帶來很多問題。 –
@SarveshKumarSingh不,鑰匙永遠不會消極。這是從指數分佈採樣的標準方法。唯一可能的缺陷是採樣可能溢出並導致2147483647,這也是正面的。此外,上面的代碼僅僅是一個演示,在真實世界的案例中將使用有效的單詞id。 –
好的。現在我們可以繼續尋找可能的答案,這只是我的猜想。因此,您應該已經知道'RDD'被分區爲多個分區,'reduceByKey'會使用基於'keys'的新分區shceme重新劃分這些分區。而巨大的「長」數字(密鑰在很大範圍內傳播)混淆了分區方案,導致太多稀疏分區,這是導致您的性能不佳的原因。 –