我有一個簡單的Spark作業,它從5節點的Cassandra集羣讀取500m行,總是運行6個任務,由於每個任務的大小而導致寫入問題。我試着調整了input_split_size,這似乎沒有效果。目前,我不得不重新對錶格掃描進行重新分區,這是不理想的,因爲它很昂貴。在Cassandra表掃描上設置Spark任務的數量
閱讀了幾篇文章後,我嘗試增加啓動腳本中的num-executors(下面),雖然這沒有效果。
如果沒有辦法設置Cassandra桌面掃描的任務數量,那很好,我會做..但我有這種不斷的小竅門,我在這裏失去了一些東西。
Spark工作人員居住在C *節點上,這些節點是8核心,64GB服務器,每臺服務器帶有2TB SSD。
...
val conf = new SparkConf(true).set("spark.cassandra.connection.host",
cassandraHost).setAppName("rowMigration")
conf.set("spark.shuffle.memoryFraction", "0.4")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", "15G")
conf.set("spark.cassandra.input.split.size_in_mb", "32") //default 64mb
conf.set("spark.cassandra.output.batch.size.bytes", "1000") //default
conf.set("spark.cassandra.output.concurrent.writes", "5") //default
val sc = new SparkContext(conf)
val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable)
.select("accountid", "userid", "eventname", "eventid", "eventproperties")
.filter(row=>row.getString("accountid").equals("someAccount"))
.repartition(100)
val object = rawEvents
.map(ele => (ele.getString("userid"),
UUID.randomUUID(),
UUID.randomUUID(),
ele.getUUID("eventid"),
ele.getString("eventname"),
"event type",
UUIDs.unixTimestamp(ele.getUUID("eventid")),
ele.getMap[String, String]("eventproperties"),
Map[String, String](),
Map[String, String](),
Map[String, String]()))
.map(row=>MyObject(row))
Object.saveToCassandra(targetCassandraKeyspace,eventTable)
啓動腳本:
#!/bin/bash
export SHADED_JAR="Migrate.jar"
export SPARKHOME="${SPARKHOME:-/opt/spark}"
export SPARK_CLASSPATH="$SHADED_JAR:$SPARK_CLASSPATH"
export CLASS=com.migration.migrate
"${SPARKHOME}/bin/spark-submit" \
--class "${CLASS}" \
--jars $SHADED_JAR,$SHADED_JAR \
--master spark://cas-1-5:7077 \
--num-executors 15 \
--executor-memory 20g \
--executor-cores 4 "$SHADED_JAR" \
--worker-cores 20 \
-Dcassandra.connection.host=10.1.20.201 \
-Dzookeeper.host=10.1.20.211:2181 \
編輯 - 繼彼得的回答是:
我已經設置sc.cassandraTable的ReadConf.splitCount如下,但是這並沒有改變多少生成的任務,這意味着我仍然需要重新分區表掃描。我開始認爲我在考慮這個錯誤,重新分配是必要的。目前這項工作大約需要1.5小時,並將表掃描重新分區爲1000個大約10MB的任務,從而將寫入時間縮短爲數分鐘。
val cassReadConfig = new ReadConf {
ReadConf.apply(splitCount = Option(1000)
)
}
val sc = new SparkContext(conf)
val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable)
.withReadConf(readConf = cassReadConfig)
這也不適合我。我做了你所做的。你能解釋一下,表格掃描*重新分配的意義以及如何去做? – Abhidemon