2015-09-08 68 views
2

我是Spark和Cassandra的新手。Cassandra的火花任務

我們在Cassandra上面使用Spark來讀取數據,因爲我們有要求使用非主鍵列讀取數據。

一個觀察是,火花工作任務數量增加w.r.t數據增長。由於這個原因,我們在獲取數據時會面臨很多延遲。

火花任務計數增加的原因是什麼?

什麼應該考慮用Cassandra提高Spark的性能?

請給我建議。

謝謝,
Mallikarjun

+0

您使用的是什麼版本的Spark&Cassandra? – Gillespie

+0

我們使用Cassandra 2.1.5和Spark 1.4.0 –

回答

3

輸入分割大小由配置spark.cassandra.input.split.size_in_mb控制。每個分割都將在Spark中生成任務,因此,Cassandra中的數據越多,處理的時間就越長(這是您所期望的)

要提高性能,請確保使用joinWithCassandraTable來對齊分區。不要使用context.cassandraTable(...),除非您絕對需要表格中的所有數據,並使用select優化檢索的數據來僅投影所需的列。

如果您需要來自某些行的數據,建立一個存儲這些行的ID的輔助表格是有意義的。

二級索引也可以幫助選擇數據的子集,但我已經看到關於如果不是高性能的報告。

1

火花任務計數增加的原因是什麼?

從maasgs答案繼,而不是在SparkConf設置spark.cassandra.input.split.size_in_mb.,它可以在一個單一的工作,從不同keyspaces /數據中心閱讀時使用的ReadConf配置有用:

val readConf = ReadConf(
     splitCount = Option(500), 
      splitSizeInMB = 64, 
      fetchSizeInRows = 1000, 
      consistencyLevel = ConsistencyLevel.LOCAL_ONE, 
      taskMetricsEnabled = true 
     ) 

    val rows = sc.cassandraTable(cassandraKeyspace, cassandraTable).withReadConf(readConf) 

應該考慮如何提高Spark的性能 Cassandra?

就提高性能而言,這取決於您正在運行的作業和所需的轉換類型。下面概述了一些可最大限度提高Spark-Cassandra性能的常規建議(如可找到here)。

您所選擇的操作及其應用順序對於性能至關重要。

您必須牢記您的任務分配和記憶來組織您的流程。

首先要確定您的數據是否被正確分區。這個上下文中的分區僅僅是一個數據塊。如果可能的話,在Spark之前分割你的數據,甚至攝取它。如果這不可行或不可行,您可以選擇在加載後立即重新分區數據。您可以重新分區以增加分區數量或合併以減少分區數量。

分區的數量應該是一個下限,至少是將要對數據進行操作的核心數量的兩倍。話雖如此,您還需要確保您執行的任何任務至少需要100ms才能證明整個網絡的分佈。請注意,重新分配總是會導致混洗,而融合通常不會。如果你和MapReduce一起工作,你就知道洗牌是大部分時間都在真正的工作中。

過早過濾並經常過濾。假設數據源未經過預處理以減少數據量,那麼最初和最好的地方是減少Spark所需要處理的數據量就是初始數據查詢。這通常通過添加where子句來實現。請勿攜帶任何不必要的數據來獲得您的目標結果。引入任何額外的數據將影響整個網絡中有多少數據可能被混洗,並寫入磁盤。不必要的移動數據是一個真正的殺手鐗,應該不惜一切代價避免

在每一步中,您應該尋找機會,以儘可能多地過濾,清除,減少或聚合數據,然後再繼續操作。

儘可能地使用管道。流水線是一系列轉換,它們代表對一部分數據的獨立操作,並且不需要整體對數據進行重新組織(洗牌)。例如:來自字符串 - >字符串長度的映射是獨立的,其中按值排序需要與其他數據元素進行比較並通過網絡重新組織數據(混洗)。

在需要洗牌的作業中,看看在洗牌步驟之前是否可以使用部分聚合或縮減(類似於MapReduce中的組合器)。這將減少洗牌階段的數據移動。

一些昂貴且需要洗牌的常見任務是按鍵分組,按鍵減少。這些操作要求將數據與其他昂貴的數據元素進行比較。瞭解Spark API非常重要,可以選擇最佳的轉換組合以及將它們放置在工作中的位置。創建回答問題所需的最簡單和最有效的算法。