2016-07-30 28 views
2

按字段將數據分區爲預定義分區計數的最佳方式是什麼?什麼是按列分區但保持固定分區數的有效方式?

我目前通過指定partionCount = 600來分區數據。發現計數600可爲我的數據集/羣集設置提供最佳查詢性能。

val rawJson = sqlContext.read.json(filename).coalesce(600) 
rawJson.write.parquet(filenameParquet) 

現在我想通過列「eventName的」分區此數據,但仍然保持計數600的數據,目前約有2000獨特eventNames,加上各eventName的行數不統一。大約10個eventNames有超過50%的數據導致數據傾斜。因此,如果我像下面那樣進行分區,那麼它不是很高效。寫入時間比沒有寫入多5倍。

val rawJson = sqlContext.read.json(filename) 
rawJson.write.partitionBy("eventName").parquet(filenameParquet) 

什麼是這些方案的數據分區的好方法?有沒有辦法通過eventName進行分區,但將其分散到600個分區中?

我的模式是這樣的:

{ 
    "eventName": "name1", 
    "time": "2016-06-20T11:57:19.4941368-04:00", 
    "data": { 
    "type": "EventData", 
    "dataDetails": { 
     "name": "detailed1", 
     "id": "1234", 
... 
... 
    } 
    } 
} 

謝謝!

回答

0

您是否嘗試過應用列表存儲概念。你有幾個分區讓你傾斜列,就像你提到的10個事件名稱一樣。其餘的,你可以只有一個分區/目錄來保存所有其他密鑰。你可以看看here。其主要針對80-20規則。

0

這是傾斜數據的常見問題,您可以採取幾種方法。

如果偏差在一段時間內保持穩定,列表分段可能會發揮作用,這可能會也可能不會發生,特別是如果引入了新的分區變量值。我還沒有研究過隨着時間的推移調整列表存儲的容易程度,正如您的評論所述,您無法使用它,因爲它是Spark 2.0的一項功能。

如果您使用的是1.6.x,關鍵的觀察是您可以創建自己的函數,將每個事件名稱映射爲600個唯一值之一。您可以將其作爲UDF或案例表達式來執行。然後,您只需使用該功能創建一個列,然後使用該列進行分區,使用repartition(600, 'myPartitionCol)而不是coalesce(600)

由於我們在Swoop處理了非常歪斜的數據,我發現以下主要的數據結構對於構建與分區相關的工具非常有用。

/** Given a key, returns a random number in the range [x, y) where 
    * x and y are the numbers in the tuple associated with a key. 
    */ 
class RandomRangeMap[A](private val m: Map[A, (Int, Int)]) extends Serializable { 
    private val r = new java.util.Random() // Scala Random is not serializable in 2.10 

    def apply(key: A): Int = { 
    val (start, end) = m(key) 
    start + r.nextInt(end - start) 
    } 

    override def toString = s"RandomRangeMap($r, $m)" 
} 

例如,這裏是我們如何建立一個分區的情況會稍有不同:一個在數據偏斜和密鑰的數量少,所以我們必須增加分區的數量爲歪斜鍵,在1堅持爲每個鍵分區的最小數目:

/** Partitions data such that each unique key ends in P(key) partitions. 
    * Must be instantiated with a sequence of unique keys and their Ps. 
    * Partition sizes can be highly-skewed by the data, which is where the 
    * multiples come in. 
    * 
    * @param keyMap maps key values to their partition multiples 
    */ 
class ByKeyPartitionerWithMultiples(val keyMap: Map[Any, Int]) extends Partitioner { 
    private val rrm = new RandomRangeMap(
    keyMap.keys 
     .zip(
     keyMap.values 
      .scanLeft(0)(_+_) 
      .zip(keyMap.values) 
      .map { 
      case (start, count) => (start, start + count) 
      } 
    ) 
     .toMap 
) 

    override val numPartitions = 
    keyMap.values.sum 

    override def getPartition(key: Any): Int = 
    rrm(key) 
} 

object ByKeyPartitionerWithMultiples { 

    /** Builds a UDF with a ByKeyPartitionerWithMultiples in a closure. 
    * 
    * @param keyMap maps key values to their partition multiples 
    */ 
    def udf(keyMap: Map[String, Int]) = { 
    val partitioner = new ByKeyPartitionerWithMultiples(keyMap.asInstanceOf[Map[Any, Int]]) 
    (key:String) => partitioner.getPartition(key) 
    } 

} 

你的情況,你有幾個事件名稱合併爲一個分區,這需要改變,但我希望上面的代碼給你一個想法如何來解決這個問題。

最後一個觀察結果是,如果隨着時間的推移,事件名稱的分佈在您的數據中有很大的價值,您可以對數據的某些部分執行統計數據收集傳遞以計算映射表。你不必一直這樣做,只是在需要的時候。要確定這一點,您可以查看每個分區中的輸出文件的行數和/或大小。換句話說,整個過程可以作爲Spark作業的一部分自動執行。

+0

感謝Sim的細節。 – vijay

+1

如果重新分區是通過計算列(eventName的映射)完成的,那麼通過eventName(即WHERE eventName ==「foo」)篩選的查詢仍然只能讀取相關分區而不執行全表掃描,因爲它現在不再是eventName分區了? – vijay

+0

只有在完全過濾分區列時,纔會發生最有效的加載。如果您的偏差在一段時間內保持穩定,則使用靜態映射(無論它可能是什麼;不一定是列表桶),並在查詢過程中應用相同的功能。如果您的偏差隨時間推移不穩定,則需要隨時間分別維護事件到分區映射的數據結構,在您正在查詢的時間段內進行聯合,並通過分區列對兩者進行過濾(以有效減少分區)和事件名稱(專注於分區)。 – Sim