2017-08-29 50 views
-1

我嘗試使用 java-driver-async-queries執行異步查詢。我正在修改FutureCallback中的列表,但似乎不起作用 -使用ResultSetFuture修改集合

List<Product> products = new ArrayList<Product>(); 

for (// iterating over a Map) { 
    key = entry.getKey(); 
    String query = "SELECT id,desc,category FROM products where id=?"; 
    ResultSetFuture future = session.executeAsync(query, key); 
    Futures.addCallback(future, 
     new FutureCallback<ResultSet>() { 
      @Override public void onSuccess(ResultSet result) { 
       Row row = result.one(); 
       if (row != null) { 
        Product product = new Product(); 
        product.setId(row.getString("id")); 
        product.setDesc(row.getString("desc")); 
        product.setCategory(row.getString("category")); 

        products.add(product); 
       } 
      } 

      @Override public void onFailure(Throwable t) { 
       // log error 
      } 
     }, 
     MoreExecutors.sameThreadExecutor() 
    ); 
} 

System.out.println("Product List : " + products); // Not printing correct values. Sometimes print blank 

是否有其他方法?

基於米哈伊爾Baksheev答案我執行,現在得到正確的結果。 只是一個轉折點。我需要實現一些額外的邏輯。我想知道如果我可以使用List<MyClass>代替List<ResultSetFuture>和MyClass的作爲 -

public class MyClass { 

    private Integer   productCount; 
    private Integer   stockCount; 
    private ResultSetFuture result; 
} 

然後同時迭代設置FutureList爲 -

ResultSetFuture result = session.executeAsync(query, key.get()); 
MyClass allResult = new MyClass(); 
allResult.setInCount(inCount); 
allResult.setResult(result); 
allResult.setSohCount(values.size() - inCount); 

futuresList.add(allResult); 
+0

什麼是「不工作」的定義是什麼?發佈預期行爲和實際行爲。 –

+1

你是不是在等你的未來?看起來你創建了一堆期貨,並立即打印結果結構。結果結構尚未填充,因爲大部分期貨仍在運行。 – RussS

+0

謝謝。什麼是修正。有沒有我可以參考的代碼示例? – Saurabh

回答

1

正如@RussS提到,是不是等待所有期貨交易代碼完成。

有許多方法可以同步異步代碼。例如,使用CountDownLatch

編輯: 也請使用separte線程回調和使用併發收集產品。

ConcurrentLinkedQueue<Product> products = new ConcurrentLinkedQueue<Product>(); 
final Executor callbackExecutor = Executors.newSingleThreadExecutor(); 
final CountDownLatch doneSignal = new CountDownLatch(/*the Map size*/); 
for (// iterating over a Map) { 
    key = entry.getKey(); 
    String query = "SELECT id,desc,category FROM products where id=?"; 
    ResultSetFuture future = session.executeAsync(query, key); 
    Futures.addCallback(future, 
     new FutureCallback<ResultSet>() { 
      @Override public void onSuccess(ResultSet result) { 
       Row row = result.one(); 
       if (row != null) { 
        Product product = new Product(); 
        product.setId(row.getString("id")); 
        product.setDesc(row.getString("desc")); 
        product.setCategory(row.getString("category")); 

        products.add(product); 
       } 
       doneSignal.countDown(); 

      } 

      @Override public void onFailure(Throwable t) { 
       // log error 
       doneSignal.countDown(); 
      } 
     }, 
     callbackExecutor 
    ); 
} 

doneSignal.await();   // wait for all async requests to finish 
System.out.println("Product List : " + products); 

另一種方法是收集所有期貨清單,然後等待所有結果作爲一個未來番石榴的Futures.allAsList,如:

List<ResultSetFuture> futuresList = new ArrayList<>(/*Map size*/); 
     for (/* iterating over a Map*/) { 
      key = entry.getKey(); 
      String query = "SELECT id,desc,category FROM products where id=?"; 
      futuresList.add(session.executeAsync(query, key)); 
     } 

     ListenableFuture<List<ResultSet>> allFuturesResult = Futures.allAsList(futuresList); 
     List<Product> products = new ArrayList<>(); 
     try { 
      final List<ResultSet> resultSets = allFuturesResult.get(); 
      for (ResultSet rs : resultSets) { 
       if (null != rs) { 
        Row row = rs.one(); 
        if (row != null) { 
         Product product = new Product(); 
         product.setId(row.getString("id")); 
         product.setDesc(row.getString("desc")); 
         product.setCategory(row.getString("category")); 

         products.add(product); 
        } 
       } 
      } 
     } catch (InterruptedException | ExecutionException e) { 
      System.out.println(e); 
     } 
     System.out.println("Product List : " + products); 

EDIT 2

我想知道如果我可以使用List而不是List和MyClass作爲

技術上是可以的,但你不能在這種情況下,通過在Futures.allAsListList<MyClass>MyClass應該實現ListenableFuture接口

+0

謝謝。我有一個問題在cassandra中做嵌套查詢,並按照您的回覆https://stackoverflow.com/questions/45471519/improve-performance-in-cassandra-and-java-collections。現在變得間歇性的問題,因爲有時我的產品列表變空了。但是,當我在onSuccess方法中打印列表時,它給了我結果。如果我使用await(),那麼方法花費很長時間。請建議。 – Saurabh

+0

@Saurabh,我已經提出了我的回答 –

+0

我編輯了我的問題。請看一看。 – Saurabh