2017-04-22 47 views
0

我有,我想攝取到3節點集羣卡桑德拉100,000個域名運行Datastax企業5.1卡桑德拉3.10.0爲什麼我的Cassandra Prepared Statement對數據的採集太慢?

我的代碼攝取Java列表,但它需要一個一長串的時間。我對集羣進行了壓力測試,並且能夠每秒處理超過25,000次寫入。隨着我的攝取代碼,我得到了大約200 /秒的可怕表現。

我的Java列表中有100,000個名字,它被稱爲myList。我使用以下準備好的語句和會話執行來獲取數據。

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)"); 

     int id = 0; 

     for(int i = 0; i < myList.size(); i++) { 
      id += 1; 
      session.execute(prepared.bind(id, myList.get(i))); 
     } 

我在我的代碼中添加了一個集羣監視器來查看發生了什麼。這是我的監控代碼。

/// Monitoring Status of Cluster 
    final LoadBalancingPolicy loadBalancingPolicy = 
    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy(); 
    ScheduledExecutorService scheduled = 
    Executors.newScheduledThreadPool(1); 
     scheduled.scheduleAtFixedRate(() -> { 
      Session.State state = session.getState(); 
      state.getConnectedHosts().forEach((host) -> { 
       HostDistance distance = loadBalancingPolicy.distance(host); 
       int connections = state.getOpenConnections(host); 
       int inFlightQueries = state.getInFlightQueries(host); 
       System.out.printf("%s connections=%d, current load=%d, maxload=%d%n", 
         host, connections, inFlightQueries, 
         connections * 
           poolingOptions.getMaxRequestsPerConnection(distance)); 
      }); 
    }, 5, 5, TimeUnit.SECONDS); 

監測5秒輸出顯示了3次迭代如下:

/192.168.20.25:9042 connections=1, current load=1, maxload=32768 
/192.168.20.26:9042 connections=1, current load=0, maxload=32768 
/192.168.20.34:9042 connections=1, current load=0, maxload=32768 
/192.168.20.25:9042 connections=1, current load=1, maxload=32768 
/192.168.20.26:9042 connections=1, current load=0, maxload=32768 
/192.168.20.34:9042 connections=1, current load=0, maxload=32768 
/192.168.20.25:9042 connections=1, current load=0, maxload=32768 
/192.168.20.26:9042 connections=1, current load=1, maxload=32768 
/192.168.20.34:9042 connections=1, current load=0, maxload=32768 

它不會出現,我非常有效地利用我的羣集。我不確定我做錯了什麼,並會非常感謝任何提示。

謝謝!

回答

3

使用executeAsync。

異步執行提供的查詢。此方法不會阻止。只要查詢已傳遞到底層網絡堆棧,它就會返回。特別是,從這種方法返回並不能保證查詢是有效的或者甚至已經提交給活動節點。訪問ResultSetFuture時,將引發與查詢失敗相關的任何異常。

您正在插入大量的數據。如果你使用executeAsync,並且你的集羣不能處理這麼多的數據,它可能會拋出異常。你可以用Semaphore來限制executeAsync。

例子:

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)"); 

int numberOfConcurrentQueries = 100; 
final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries); 

int id = 0;  

for(int i = 0; i < myList.size(); i++) { 
    try { 
     id += 1; 
     semaphore.acquire(); 
     ResultSetFuture future = session.executeAsync(prepared.bind(id, myList.get(i))); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
      @Override 
      public void onSuccess(ResultSet result) { 
       semaphore.release(); 
      } 

      @Override 
      public void onFailure(Throwable t) { 
       semaphore.release(); 
      } 
     }); 
    } catch (Exception e) { 
     semaphore.release(); 
     e.printStackTrace(); 
    } 
} 

來源:
https://stackoverflow.com/a/30526719/2320144 https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/Session.html#executeAsync-com.datastax.driver.core.Statement-

+0

爲什麼需要ID?成功計數? –

+0

id是分區鍵 – mithrix

+0

@mithrix回答更新 –