2017-04-03 49 views
0

我使用Spark來處理某些語料庫,並且需要計算每個2-gram的發生次數。我開始計算元組(wordID1, wordID2),它工作正常,除了由於大量的小元組對象造成的大內存使用和gc開銷。然後我試圖將一對Int放入Long,gc開銷確實減少了很多,但運行時間也增加了幾次。Spark:flatMap/reduceByKey在某些發行版上似乎很慢,使用Long鍵

我用不同分佈的隨機數據進行了一些小實驗。性能問題似乎只發生在指數分佈式數據上。

// lines of word IDs 
val data = (1 to 5000).par.map({ _ => 
    (1 to 1000) map { _ => (-1000 * Math.log(Random.nextDouble)).toInt } 
}).seq 

// count Tuples, fast 
sc parallelize(data) flatMap { line => 
    val first = line.iterator 
    val second = line.iterator.drop(1) 
    for (pair <- first zip(second)) 
    yield (pair, 1L) 
} reduceByKey { _ + _ } count() 

// count Long, slow 
sc parallelize(data) flatMap { line => 
    val first = line.iterator 
    val second = line.iterator.drop(1) 
    for ((a, b) <- first zip(second)) 
    yield ((a.toLong << 32) | b, 1L) 
} reduceByKey { _ + _ } count() 

的工作分爲兩個階段,flatMap()count()。當計數Tuple2秒,flatMap()需要約6秒和count()需要約2秒,而當計數Long秒,flatMap()需要18秒和count()需要10秒。

作爲Long,我沒有任何意義,因爲它應該比Tuple2開銷少。對於Long密鑰,spark是否具有一些特殊功能?對於某些特定的分配,這種情況會發生得更糟?

+0

那麼......除了性能,你忘記了'長'沒有實際無符號,你也有你的密鑰負數。這個「符號」位會擾亂你的「左移+按位或可預測性」。因此,在這種情況下,「長」的方法會給你帶來很多問題。 –

+0

@SarveshKumarSingh不,鑰匙永遠不會消極。這是從指數分佈採樣的標準方法。唯一可能的缺陷是採樣可能溢出並導致2147483647,這也是正面的。此外,上面的代碼僅僅是一個演示,在真實世界的案例中將使用有效的單詞id。 –

+0

好的。現在我們可以繼續尋找可能的答案,這只是我的猜想。因此,您應該已經知道'RDD'被分區爲多個分區,'reduceByKey'會使用基於'keys'的新分區shceme重新劃分這些分區。而巨大的「長」數字(密鑰在很大範圍內傳播)混淆了分區方案,導致太多稀疏分區,這是導致您的性能不佳的原因。 –

回答

1

感謝@ SarveshKumarSingh的暗示,我終於解決了這個問題。這不是Spark的Long專門化引發的問題,但Java和Spark沒有正確解決它。

Java's hashCode() for Long很簡單,強烈地依賴於值的兩半,並根據他們的hashCode()值星火的默認HashPartitioner簡單分區鍵模分區號。這些使得Spark的默認分區對Long密鑰的分發非常敏感,特別是當分區數量相對較少時。在我的情況下,情況惡化,因爲鍵是通過連接Int對來構造的。

這個解決方案會非常簡單,因爲我們只是需要以某種方式「洗牌」鍵,這使得具有相似頻率的鍵均勻分佈。

最簡單的方法是使用some perfect hash function將每個密鑰映射到另一個唯一值,並在需要原始密鑰時將其轉換回來。這種方法只涉及較小的代碼更改,但可能無法很好地執行。我使用以下映射實現了類似於逐元組方法的性能。

val newKey = oldKey * 6364136223846793005L + 1442695040888963407L 
val oldKey = (newKey - 1442695040888963407L) * -4568919932995229531L 

更有效的方法是替換默認HashPartitioner。我在flatMapreduceByKey之間使用了以下分區器,並在現實世界的數據上實現了兩倍的性能提升。

val prevRDD = // ... flatMap ... 
val nParts = prevRDD.partitioner match { 
    case Some(p) => p.numPartitions 
    case None => prevRDD.partitions.size 
} 

prevRDD partitionBy (new Partitioner { 
    override def getPartition(key: Any): Int = { 
    val rawMod = LongHash(key.asInstanceOf[Long]) % numPartitions 
    rawMod + (if (rawMod < 0) numPartitions else 0) 
    } 
    override def numPartitions: Int = nParts 
}) reduceByKey { _ + _ } 

def LongHash(v: Long) = { // the 64bit mix function from Murmurhash3 
    var k = v 
    k ^= k >> 33 
    k *= 0xff51afd7ed558ccdL 
    k ^= k >> 33 
    k *= 0xc4ceb9fe1a85ec53L 
    k ^= k >> 33 
    k.toInt 
} 
相關問題