2015-07-28 32 views
2

我有一個簡單的Spark作業,它從5節點的Cassandra集羣讀取500m行,總是運行6個任務,由於每個任務的大小而導致寫入問題。我試着調整了input_split_size,這似乎沒有效果。目前,我不得不重新對錶格掃描進行重新分區,這是不理想的,因爲它很昂貴。在Cassandra表掃描上設置Spark任務的數量

閱讀了幾篇文章後,我嘗試增加啓動腳本中的num-executors(下面),雖然這沒有效果。

如果沒有辦法設置Cassandra桌面掃描的任務數量,那很好,我會做..但我有這種不斷的小竅門,我在這裏失去了一些東西。

Spark工作人員居住在C *節點上,這些節點是8核心,64GB服務器,每臺服務器帶有2TB SSD。

... 
val conf = new SparkConf(true).set("spark.cassandra.connection.host", 
cassandraHost).setAppName("rowMigration") 
    conf.set("spark.shuffle.memoryFraction", "0.4") 
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    conf.set("spark.executor.memory", "15G") 
    conf.set("spark.cassandra.input.split.size_in_mb", "32") //default 64mb 
    conf.set("spark.cassandra.output.batch.size.bytes", "1000") //default 
    conf.set("spark.cassandra.output.concurrent.writes", "5") //default 

val sc = new SparkContext(conf) 

val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable) 
    .select("accountid", "userid", "eventname", "eventid", "eventproperties") 
    .filter(row=>row.getString("accountid").equals("someAccount")) 
    .repartition(100) 

val object = rawEvents 
    .map(ele => (ele.getString("userid"), 
    UUID.randomUUID(), 
    UUID.randomUUID(), 
    ele.getUUID("eventid"), 
    ele.getString("eventname"), 
    "event type", 
    UUIDs.unixTimestamp(ele.getUUID("eventid")), 
    ele.getMap[String, String]("eventproperties"), 
    Map[String, String](), 
    Map[String, String](), 
    Map[String, String]())) 
    .map(row=>MyObject(row)) 

Object.saveToCassandra(targetCassandraKeyspace,eventTable) 

啓動腳本:

#!/bin/bash 
export SHADED_JAR="Migrate.jar" 
export SPARKHOME="${SPARKHOME:-/opt/spark}" 
export SPARK_CLASSPATH="$SHADED_JAR:$SPARK_CLASSPATH" 
export CLASS=com.migration.migrate 
"${SPARKHOME}/bin/spark-submit" \ 
     --class "${CLASS}" \ 
     --jars $SHADED_JAR,$SHADED_JAR \ 
     --master spark://cas-1-5:7077 \ 
     --num-executors 15 \ 
     --executor-memory 20g \ 
     --executor-cores 4 "$SHADED_JAR" \ 
     --worker-cores 20 \ 
     -Dcassandra.connection.host=10.1.20.201 \ 
     -Dzookeeper.host=10.1.20.211:2181 \ 

編輯 - 繼彼得的回答是:

我已經設置sc.cassandraTable的ReadConf.splitCount如下,但是這並沒有改變多少生成的任務,這意味着我仍然需要重新分區表掃描。我開始認爲我在考慮這個錯誤,重新分配是必要的。目前這項工作大約需要1.5小時,並將表掃描重新分區爲1000個大約10MB的任務,從而將寫入時間縮短爲數分鐘。

val cassReadConfig = new ReadConf { 
     ReadConf.apply(splitCount = Option(1000) 
     ) 
    } 

    val sc = new SparkContext(conf) 

    val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable) 
    .withReadConf(readConf = cassReadConfig) 
+0

這也不適合我。我做了你所做的。你能解釋一下,表格掃描*重新分配的意義以及如何去做? – Abhidemon

回答

3

由於spark連接器1.3,分割大小是根據system.Size_estimates Cassandra表估計的,自Cassandra 2.1.5以來可用。該表由Cassandra定期刷新,並且在加載/刪除新數據或加入新節點後不久,其內容可能不正確。檢查那裏的估計值是否反映了您的數據量。這是一個相對較新的功能,所以它也很可能存在一些錯誤。

如果估算錯誤,或者您運行的是較早的Cassandra,我們有權重設自動拆分大小調整。 sc.cassandraTable接受ReadConf參數,您可以在其中設置splitCount,這會強制固定數量的分割。

至於split_size_in_mb參數,確實在項目源代碼中存在一段時間的錯誤,但在發佈到發佈到maven的任何版本之前,它已被修復。所以除非你編譯來自(舊)源的連接器,否則你不應該擊中它。

+0

感謝您的信息!我測試過將ReadConf參數設置爲我們當前使用的C * 2.1.4。這裏調整splitCount或splitSizeInMB並不會改變任務的數量。我將升級到2.1.5並查看system.size_esitmates表是否有任何影響。 – Gillespie

0

似乎有split.size_in_mb參數的錯誤。該代碼可能會將其解釋爲字節而不是兆字節,因此請嘗試將32更改爲更大。請參閱答案here中的示例。

+0

謝謝,雖然這個bug已經修復了1.4.0版 - 我目前正在使用它。如果發生這種情況,我會在運行的任務數量上看到完全相反的結果。 – Gillespie