的我用DSE 4.5.3DataStax企業:saveToCassandra產生大量的暗示切換
我有8個節點(相當強大的節點)的集羣是從火花到卡桑德拉與數據產生的麻煩,我想從火花中產生一些測試數據。
我的spark工作是從cassandra表(它代表了一天的數據)中讀取5M行,然後將它們緩存到內存中(每個節點的內存爲32 GB,所以沒問題),最後將它們保存n次在另一個cassandra表中,模擬更多天的數據。
val table = sc.cassandraTable[RecordData]("data", "one_day").cache
val firstDate = table.first.gets_dt_tm
val start = 1
val end = 10
for(i <- start to end){
table.map(row => {
//modify row to increment row timestamp day according to i
java.lang.Thread sleep 2
row
}).saveToCassandra("data","ten_days")
}
我也放慢了寫入過程的睡眠,但沒有幫助。問題是,在我的集羣中,我得到了很多提示,我不得不連續修復節點。請記住,我需要生成600天的數據。
這是我的表
CREATE TABLE ten_days(
YEAR int,
MONTH int,
DAY int,
ID decimal,
... other fields
S_DT_TM timestamp,
PRIMARY KEY ((ID,C_TRX_REF),YEAR,MONTH,DAY,S_DT_TM));
ID的結構和C_TRX_REF是一天中唯一的密鑰,但不能跨多天。 (ID,C_TRX_REF)的不同計數是5M。
S_DT_TM是第二個分辨率的時間戳,所以它在我的數據集中不是唯一的。
爲什麼spark寫入cassandra會產生提示?你需要更多信息嗎? 什麼是從火花寫入數百萬行到cassandra的最佳實踐?
感謝
謝謝拉斯,你是在睡覺!我的SparkConf目前是val conf = new SparkConf(true) .set(「spark.cassandra.connection.host」,「172.17.52.30」) .set(「spark.cassandra.auth.username」,「cassandra 「) .set(」spark.cassandra.auth.password「,」cassandra「) .set(」spark.cassandra.output.batch.size.rows「,」5120「) .set(」spark.cassandra .output.batch.size.bytes「,」262144「) .set(」spark.cassandra.output.concurrent.writes「,」10「) – 2014-11-21 20:56:42
但我仍然收到類似這些批准的[data2。 ten_days]的大小是149231,超過了指定的5120的閾值144111 – 2014-11-21 21:04:58
只能使用批量字節並將其設置爲5120 – RussS 2014-11-21 21:20:33