在將9百萬行的批次寫入12節點cassandra(2.1.2)羣集時,spark-cassandra-connector(1.0.4,1.1.0)出現問題。我用一致性ALL編寫並讀取一致性爲ONE,但讀取的行數每次都與900萬(8.865.753,8.753.213等)不同。異步寫入在Cassandra中似乎被破壞
我檢查了連接器的代碼,發現沒有問題。然後,我決定編寫自己的應用程序,獨立於spark和連接器,以調查問題(唯一的依賴是datastax-driver-code version 2.1.3)。
完整的代碼,啓動腳本和配置文件現在可以是found on github。
在僞代碼,我寫了兩個不同版本的應用程序,同步一個:
try (Session session = cluster.connect()) {
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
session.execute(bound);
}
}
而異步之一:
try (Session session = cluster.connect()) {
List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
while(futures.size()>=10 /* Max 10 concurrent writes */) {
// Wait for the first issued write to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
futures.add(session.executeAsync(bound));
}
while(futures.size()>0) {
// Wait for the other write requests to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
}
最後一個是類似於使用無批處理配置情況下的連接器。
這兩個版本的應用程序在所有情況下都是一樣的,除非負載很高。例如,當在9臺機器(45個線程)上運行5個線程的同步版本時,將9百萬行寫入羣集,我在隨後的讀取中找到所有行(使用spark-cassandra-connector)。
如果我運行異步版本,每臺機器上有1個線程(9個線程),執行速度要快得多,但我無法在隨後的讀取中找到所有行(與spark-cassandra連接器相同的問題) 。
代碼在執行過程中沒有拋出異常。
問題的原因是什麼?
我添加一些其他的結果(評論感謝):
- 異步版本與9個機9個線程,每個線程5名併發作家(45名併發作家):沒有問題
- 同步版本與9個機90個線程(每個JVM實例10個線程):沒有問題
問題似乎開始異步引起的寫入和數量的併發作家> 45 < = 90,所以我做了其他測試,以確保該發現是正確的:
- 將ResultSetFuture的「get」方法替換爲 「getUninterruptibly」:相同的問題。
- 9臺機器上有18個線程,5個併發的異步版本 每個線程的寫入者(90個併發寫入器):沒有問題。
最後的發現表明併發寫入程序(90)的高數量不像第一次測試中預期的那樣是一個問題。問題是使用同一會話的大量異步寫入。
在同一會話中有5個併發異步寫入問題不存在。如果我將併發寫入數增加到10,某些操作會在沒有通知的情況下丟失。
如果您在同一會話上同時發出多個(> 5個)寫入,似乎Cassandra 2.1.2(或Cassandra Java驅動程序)中的異步寫入被破壞。
您是否考慮過使用BATCH語句而不是分別發送每個更新?我知道這並沒有解決你遇到的問題,但它似乎更適合做批量插入。 – Onots
是的,問題也存在於批處理語句中。我沒有使用批處理,因爲它們受到最新版本連接器中修復的spark cassandra連接器中的另一個問題的影響。我已經使用該修補程序的自編譯版本的連接器,並發現相同的問題。 –
我在[github](https://github.com/nibbio84/cassandra-loader-bug-showcase)上添加了所有代碼和配置文件 –