2013-10-05 34 views
6

我打算使用Datastax Java驅動程序寫入Cassandra ..我主要感興趣的是Batch WritesAsycnhronous Datastax java驅動程序的功能,但我無法獲得任何教程可以解釋我如何將在使用Datastax Java驅動程序我下面的代碼,這些功能..如何使用Datastax Java驅動程序的異步/批量寫入功能

/** 
* Performs an upsert of the specified attributes for the specified id. 
*/ 
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) { 

    try { 

     // make a sql here using the above input parameters. 

     String sql = sqlPart1.toString()+sqlPart2.toString(); 

     DatastaxConnection.getInstance(); 
     PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql); 
     prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);   

     BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()])); 

     DatastaxConnection.getSession().execute(query); 

    } catch (InvalidQueryException e) { 
     LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e); 
    } catch (Exception e) { 
     LOG.error("Exception in DatastaxClient::upsertAttributes "+e); 
    } 
} 

在下面的代碼中,我創建一個連接到卡桑德拉使用Datastax Java驅動程序節點。

/** 
* Creating Cassandra connection using Datastax Java driver 
* 
*/ 
private DatastaxConnection() { 

    try{ 
     builder = Cluster.builder(); 
     builder.addContactPoint("some_nodes"); 

     builder.poolingOptions().setCoreConnectionsPerHost(
       HostDistance.LOCAL, 
       builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL)); 

     cluster = builder 
       .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) 
       .withReconnectionPolicy(new ConstantReconnectionPolicy(100L)) 
       .build(); 

     StringBuilder s = new StringBuilder(); 
     Set<Host> allHosts = cluster.getMetadata().getAllHosts(); 
     for (Host h : allHosts) { 
      s.append("["); 
      s.append(h.getDatacenter()); 
      s.append(h.getRack()); 
      s.append(h.getAddress()); 
      s.append("]"); 
     } 
     System.out.println("Cassandra Cluster: " + s.toString()); 

     session = cluster.connect("testdatastaxks"); 

    } catch (NoHostAvailableException e) { 
     e.printStackTrace(); 
     throw new RuntimeException(e); 
    } catch (Exception e) { 

    } 
} 

任何人可以幫助我如何批量寫入或異步功能添加到我上面的代碼..感謝您的幫助..

我正在卡桑德拉1.2.9

回答

8

對於非同步它就這麼簡單使用executeAsync功能:

... 
DatastaxConnection.getSession().executeAsync(query); 

對於批,你需要建立查詢(我使用字符串,因爲編譯器知道如何優化串孔卡tenation真的很好):

String cql = "BEGIN BATCH " 
     cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); "; 
     cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); "; 
     cql += "APPLY BATCH; " 

DatastaxConnection.getInstance(); 
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql); 
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);   

// this is where you need to be careful 
// bind expects a comma separated list of values for all the params (?) above 
// so for the above batch we need to supply 4 params:      
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2"); 

DatastaxConnection.getSession().execute(query); 

在一個側面說明,我覺得你的聲明可能會是這個樣子的結合,假設您更改屬性圖的列表,其中每個地圖代表的更新/插入批次內:

BoundStatement query = prepStatement.bind(userId, 
              attributesList.get(0).values().toArray(new Object[attributes.size()]), 
              userId_2, 
              attributesList.get(1).values().toArray(new Object[attributes.size()])); 
+0

是否有辦法用命名參數執行此操作? – Highstead

+1

@Highstead什麼編程語言?上面是java所以([排序](http://java.dzone.com/articles/named-parameters-java)) –

+0

我專注於python,但我認爲是否有辦法在一個另一方面會有辦法做到這一點。舊的cql驅動程序支持它,但已被棄用。所以我正在尋找替代功能。 – Highstead

5

對於Lyuben的答案提供的例子中,設置了一批像Type.COUNTER的某些屬性(如果你需要更新計數器)使用字符串將無法正常工作。相反,您可以像批准那樣安排您準備好的對賬單:

final String insertQuery = "INSERT INTO test.prepared (id, col_1) VALUES (?,?);"; 
final PreparedStatement prepared = session.prepare(insertQuery); 

final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED); 
batch.add(prepared.bind(userId1, "something")); 
batch.add(prepared.bind(userId2, "another")); 
batch.add(prepared.bind(userId3, "thing")); 

session.executeAsync(batch); 
+1

我比接受的答案更喜歡這個。這裏批處理的內容可以是動態的(對比固定的CQL和被接受的答案中的參數數量) – 0cd

相關問題