2014-09-12 43 views
1

我有這樣的代碼,一旦ArrayBlockingQueue填充了配額,就會將文檔轉儲到MongoDB中。當我運行代碼時,它似乎只運行一次,然後給我一個堆棧跟蹤。我的猜測是,BulkWriteOperation有人必須「重置」或重新開始。嘗試使用MongoDB時發生java.lang.IllegalStateException BulkWriteOperation

另外,我在構造函數創建BulkWriteOperations ...

bulkEvent = eventsCollection.initializeOrderedBulkOperation(); 
bulkSession = sessionsCollection.initializeOrderedBulkOperation(); 

這裏的堆棧跟蹤。

10 records inserted 
java.lang.IllegalStateException: already executed 
    at org.bson.util.Assertions.isTrue(Assertions.java:36) 
    at com.mongodb.BulkWriteOperation.insert(BulkWriteOperation.java:62) 
    at willkara.monkai.impl.managers.DataManagers.MongoDBManager.dumpQueue(MongoDBManager.java:104) 
    at willkara.monkai.impl.managers.DataManagers.MongoDBManager.addToQueue(MongoDBManager.java:85) 

下面是隊列代碼:

public void addToQueue(Object item) { 
     if (item instanceof SakaiEvent) { 
      if (eventQueue.offer((SakaiEvent) item)) { 
      } else { 
       dumpQueue(eventQueue); 
      } 

     } 
     if (item instanceof SakaiSession) { 
      if (sessionQueue.offer((SakaiSession) item)) { 
      } else { 
       dumpQueue(sessionQueue); 
      } 
     } 

    } 

這裏是從隊列中讀取,並將它們添加到BulkWriteOperation(initializeOrderedBulkOperation)來執行,然後將其轉儲到數據庫的代碼。只有10個文件被寫入,然後失敗。

private void dumpQueue(BlockingQueue q) { 
     Object item = q.peek(); 
     Iterator itty = q.iterator(); 
     BulkWriteResult result = null; 

     if (item instanceof SakaiEvent) { 
      while (itty.hasNext()) { 
       bulkEvent.insert(((SakaiEvent) itty.next()).convertToDBObject()); 
       //It's failing at that line^^ 
      } 
      result = bulkEvent.execute(); 

     } 
     if (item instanceof SakaiSession) { 
      while (itty.hasNext()) { 
       bulkSession.insert(((SakaiSession) itty.next()).convertToDBObject()); 
      } 
      result = bulkSession.execute(); 
     } 

     System.out.println(result.getInsertedCount() + " records inserted"); 
    } 

回答

4

general documentation適用於這種情況下的所有驅動程序實現:

「執行後,您不能重新執行批量()對象,而無需重新初始化。」

所以.execute()方法有效地「漏」已發送給它操作的當前列表現在包含的命令是如何實際發送狀態信息。因此,您無法添加更多條目,或者在同一實例上再次調用.execute()而不重新初始化。

等你以後調用execute每個「批量」的對象,則需要再次調用intialize:

bulkEvent = eventsCollection.initializeOrderedBulkOperation(); 
bulkSession = sessionsCollection.initializeOrderedBulkOperation(); 

每條這樣的線的放置在你的每個函數調用.execute()後再次repectively。然後進一步調用這些實例可以添加操作並再次執行循環。

請注意,「批量」操作對象將存儲您想要放入它們的項目數量,但會將對服務器的請求分解爲最多1000個項目。執行後,操作列表的狀態將反映如何完成這項工作,如果你想檢查。

+0

這就是我原來的想法,一定是錯過了它的文檔。那麼我會重寫代碼以反映這一點。 我也知道1000個項目的限制。謝謝! – Zeratas 2014-09-13 15:57:37

相關問題