2014-12-27 42 views
9

在將9百萬行的批次寫入12節點cassandra(2.1.2)羣集時,spark-cassandra-connector(1.0.4,1.1.0)出現問題。我用一致性ALL編寫並讀取一致性爲ONE,但讀取的行數每次都與900萬(8.865.753,8.753.213等)不同。異步寫入在Cassandra中似乎被破壞

我檢查了連接器的代碼,發現沒有問題。然後,我決定編寫自己的應用程序,獨立於spark和連接器,以調查問題(唯一的依賴是datastax-driver-code version 2.1.3)。

完整的代碼,啓動腳本和配置文件現在可以是found on github

在僞代碼,我寫了兩個不同版本的應用程序,同步一個:

try (Session session = cluster.connect()) { 

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>"; 
    PreparedStatement pstm = session.prepare(cql); 

    for(String partitionKey : keySource) { 
     // keySource is an Iterable<String> of partition keys 

     BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */); 
     bound.setConsistencyLevel(ConsistencyLevel.ALL); 

     session.execute(bound); 
    } 

} 

而異步之一:

try (Session session = cluster.connect()) { 

    List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>(); 

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>"; 
    PreparedStatement pstm = session.prepare(cql); 

    for(String partitionKey : keySource) { 
     // keySource is an Iterable<String> of partition keys 

     while(futures.size()>=10 /* Max 10 concurrent writes */) { 
      // Wait for the first issued write to terminate 
      ResultSetFuture future = futures.get(0); 
      future.get(); 
      futures.remove(0); 
     } 

     BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */); 
     bound.setConsistencyLevel(ConsistencyLevel.ALL); 

     futures.add(session.executeAsync(bound)); 
    } 

    while(futures.size()>0) { 
     // Wait for the other write requests to terminate 
     ResultSetFuture future = futures.get(0); 
     future.get(); 
     futures.remove(0); 
    } 
} 

最後一個是類似於使用無批處理配置情況下的連接器。

這兩個版本的應用程序在所有情況下都是一樣的,除非負載很高。例如,當在9臺機器(45個線程)上運行5個線程的同步版本時,將9百萬行寫入羣集,我在隨後的讀取中找到所有行(使用spark-cassandra-connector)。

如果我運行異步版本,每臺機器上有1個線程(9個線程),執行速度要快得多,但我無法在隨後的讀取中找到所有行(與spark-cassandra連接器相同的問題) 。

代碼在執行過程中沒有拋出異常。

問題的原因是什麼?

我添加一些其他的結果(評論感謝):

  • 異步版本與9個機9個線程,每個線程5名併發作家(45名併發作家):沒有問題
  • 同步版本與9個機90個線程(每個JVM實例10個線程):沒有問題

問題似乎開始異步引起的寫入和數量的併發作家> 45 < = 90,所以我做了其他測試,以確保該發現是正確的:

  • 將ResultSetFuture的「get」方法替換爲 「getUninterruptibly」:相同的問題。
  • 9臺機器上有18個線程,5個併發的異步版本 每個線程的寫入者(90個併發寫入器):沒有問題

最後的發現表明併發寫入程序(90)的高數量不像第一次測試中預期的那樣是一個問題。問題是使用同一會話的大量異步寫入。

在同一會話中有5個併發異步寫入問題不存在。如果我將併發寫入數增加到10,某些操作會在沒有通知的情況下丟失。

如果您在同一會話上同時發出多個(> 5個)寫入,似乎Cassandra 2.1.2(或Cassandra Java驅動程序)中的異步寫入被破壞。

+0

您是否考慮過使用BATCH語句而不是分別發送每個更新?我知道這並沒有解決你遇到的問題,但它似乎更適合做批量插入。 – Onots

+0

是的,問題也存在於批處理語句中。我沒有使用批處理,因爲它們受到最新版本連接器中修復的spark cassandra連接器中的另一個問題的影響。我已經使用該修補程序的自編譯版本的連接器,並發現相同的問題。 –

+0

我在[github](https://github.com/nibbio84/cassandra-loader-bug-showcase)上添加了所有代碼和配置文件 –

回答

5

尼古拉和我本週末通過電子郵件進行了交流,並認爲我會用我現在的理論提供一個更新。我看了一下Nicola分享的github project,並試驗了EC2上的8節點集羣。

我能夠重現2.1.2的問題,但確實觀察到經過一段時間後,我可以重新執行spark工作,並返回所有9百萬行。

我似乎注意到,儘管節點處於壓縮狀態,但我沒有獲得全部900萬行。一時興起,我看了看change log for 2.1,並觀察到可能解釋此問題的問題CASSANDRA-8429 - "Some keys unreadable during compaction"

看到問題已被修復爲2.1.3的目標,我重新測試了cassandra-2.1分支,並在壓縮活動發生時運行計數工作,並獲得了900萬行。

我想嘗試一下更多,因爲我對cassandra-2.1分支的測試非常有限,壓縮活動可能純屬巧合,但我希望這可以解釋這些問題。

+0

沒有使用2.1.3進行測試,但只有在自動壓縮進行時,我才能確認問題僅出現在水平壓實策略中。隨着大小分層壓實或平穩壓實,Cassandra運作良好。 –

6

幾個可能性:

  • 你的異步例子發出10在一次寫在時間9個線程,因此90中,而您的同步例子只是做45寫入的時間,所以我會嘗試將異步下降到相同的速度,所以這是一個蘋果比較蘋果。

    你不說你是如何檢查與異步方法異常。我看你是使用future.get(),但建議使用getUninterruptibly()如文檔中指出:

    等待查詢返回,並返回其結果。此方法通常比Future.get()方便 ,因爲它:不間斷地等待 結果,所以不會拋出InterruptedException。 返回有意義的異常,而不必處理 ExecutionException。因此,這是獲得未來 結果的首選方式。

    所以也許你沒有看到與你的異步例子發生的寫例外。

  • 另一個不太可能的可能性是,你的keySource出於某種原因返回重複的分區鍵,所以當你執行寫操作時,其中一些最終會覆蓋以前插入的行並且不會增加行數。但是這也會影響同步版本,所以我就說這不太可能。

    我會嘗試寫入比900萬更小的集合,並且速度很慢,並且看看問題是否僅在某個插入次數或特定插入次數開始發生。如果插入次數有影響,那麼我會懷疑數據中的行鍵有問題。如果插入率有影響,那麼我會懷疑熱點導致寫入超時錯誤。

  • 要檢查的另一件事是Cassandra日誌文件,以查看是否有任何異常在那裏報告。

附錄:14年12月30日

我嘗試使用示例代碼與卡桑德拉2.1.2和2.1.3驅動重現症狀。我使用了一個單一的表格和一個遞增數字的關鍵字,這樣我就可以看到數據中的空白。我做了很多異步插入(每個線程一次30個,每個線程在10個線程中全部使用一個全局會話)。然後我做了一個「select count(*)」的表,事實上它報告的表中行數比預期的少。然後我做了一個「select *」並將這些行轉儲到一個文件並檢查丟失的密鑰。它們似乎是隨機分佈的,但是當我查詢那些缺失的單行時,事實證明它們實際上存在於表格中。然後我注意到每次我執行「select count(*)」時,都會返回一個不同的數字,所以它似乎給出了表中的行數的近似值,而不是實際的數字。

因此,我修改了測試程序,在所有寫入之後執行回讀階段,因爲我知道所有的關鍵值。當我這樣做時,所有的異步寫入都出現在表格中。

所以我的問題是,你如何檢查完成後寫在表中的行數?您是在查詢每個單獨的鍵值還是使用某種操作(如「select *」)?如果後者似乎給出了大部分行,但不是全部行,那麼也許你的數據實際上是存在的。由於沒有例外被拋出,它似乎表明寫入都是成功的。另一個問題是,你確定你的關鍵值對於所有900萬行是唯一的。

+0

我沒有使用count(*),因爲它向我展示了錯誤的結果開始。我使用了兩種計算行的方法:1)Spark-cassandra連接器,它在令牌環空間執行多個查詢並總結結果; 2)帶有hadoop mapreduce API的Spark。我注意到兩種方法的相同行爲。 –

+0

我也確定行號是不同的。我多次檢查它們,當我在啓動腳本中將「異步」參數更改爲「同步」時,它的行ID是OK。我也經歷過你正在討論的關於閱讀時間的行爲。發現單行的原因可能是由於:1)讀取修復(如果它們在羣集中啓用)2)每次讀取行時,都可以從不同的節點讀取相對於計數(*)的值。既然你正在寫一致性一致,這不應該發生。 –

+1

您可能想要嘗試爲測試設置1的複製因子,並查看是否可以在異步寫入後查找實際缺少的行。通過單個鍵讀回行是確定行是否丟失的確定性測試,因爲這些其他方法似乎在計數中而不是丟失的行。如果你可以發佈更多的代碼,我可以嘗試重現症狀,但是到目前爲止,當我用一個會話進行大量的異步寫操作時,它們都出現在表中。 –