2013-08-29 117 views
3

我使用Hazelcast 2.0.1頻繁更新數據(一些2分鐘),其中包括首先刪除數據,然後從數據庫加載數據。然而,線程中的某個線程會對某個鍵進行鎖定,從而防止刪除操作並引發異常(java.util.ConcurrentModificationException: Another thread holds a lock for the key: [email protected])。請幫助我,我的地圖在hazelcast中得到更新。如何頻繁更新Hazelcast地圖

我給下面

我的代碼DeltaParallelizer

def customerDetails = dataOperations.getDistributedStore(DataStructures.customer_project.name()).keySet() 
ExecutorService service = Hazelcast.getExecutorService() 

def result 
try{ 
    customerDetails?.each{customerEmail-> 
     log.info String.format('Creating delta task for customer:%s',customerEmail) 
     def dTask = new DistributedTask(new EagerDeltaTask(customerEmail)) 
     service.submit(dTask); 
    } 
    customerDetails?.each {customerEmail -> 
     log.info String.format('Creating task customer aggregation for %s',customerEmail) 
     def task = new DistributedTask(new EagerCustomerAggregationTask(customerEmail)) 
     service.submit(task) 
    } 
} 
catch(Exception e){ 
    e.printStackTrace() 
} 

EagerDeltaTask

class EagerDeltaTask implements Callable,Serializable { 
    private final def emailId 
    EagerDeltaTask(email){ 
     emailId = email 
    } 
    @Override 
    public Object call() throws Exception { 
     log.info(String.format("Eagerly computing delta for %s",emailId))  
     def dataOperations = new DataOperator() 
     def tx = Hazelcast.getTransaction() 
     tx.begin() 
     try{ 
      deleteAll(dataOperations) 
      loadAll(dataOperations) 
      tx.commit() 
     } 
     catch(Exception e){ 
      tx.rollback() 
      log.error(String.format('Delta computation is screwed while loading data for the project:%s',emailId),e) 
     }  
    } 

    private void deleteAll(dataOperations){ 
     log.info String.format('Deleting entries for customer %s',emailId)  
     def projects = dataOperations.getDistributedStore(DataStructures.customer_project.name()).get(emailId) 
     projects?.each{project-> 
      log.info String.format('Deleting entries for project %s',project[DataConstants.PROJECT_NUM.name()]) 
      def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()])?.collect{it[DataConstants.SR_NUM.name()]} 
      def activitiesStore = dataOperations.getDistributedStore(DataStructures.sr_activities.name()) 
      srs?.each{sr -> 
       activitiesStore.remove(sr) 
      } 
      dataOperations.getDistributedStore(DataStructures.project_sr_aggregation.name()).remove(project[DataConstants.PROJECT_NUM.name()]) 
     }  
     dataOperations.getDistributedStore(DataStructures.customer_project.name()).remove(emailId) 
    } 

    private void loadAll(dataOperations){ 
     log.info(String.format('Loading entries for customer %s',emailId)) 
     def projects = dataOperations.projects(emailId) 
     projects?.each{project-> 
      log.info String.format('Loading entries for project %s',project[DataConstants.PROJECT_NUM.name()]) 
      def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()]) 
      srs?.each{sr-> 
       dataOperations.activities(sr[DataConstants.SR_NUM.name()]) 
      } 
     }  
    } 
} 

DataOperator

class DataOperator { 
def getDistributedStore(String name){ 
    Hazelcast.getMap(name) 
} 
} 

我得到例外deleteAll SRS,因此一些地圖內容將被刪除,並且只有內容被刪除且地圖的其餘部分具有舊數據的地圖纔會加載新數據。所以我沒有在我的Hazelcast地圖中獲得更新的數據。請就我如何將更新的數據放入我的Hazelcast地圖建議您的看法。

也是這樣Hazelcast.getTransaction客戶端爲此目的而工作?

注:客戶可以有很多project_num,1個project_num可以由多個用戶共享太 1 project_num可以有多個SR_NUM

回答

3

我用Hazelcast eviction policy解決我的問題。我使用了一個<time-to-live-seconds>300</time-to-live-seconds>,它每5分鐘清除一次地圖內容,當任何請求來自UI的任何地圖時,它會從加載器重新加載該地圖內容。

下面是Hazelcast地圖配置

... 
<map name="customer_project" > 
    <map-store enabled="true"> 
     <class-name>com.abc.arena.datagrid.loader.CustomerProjectData</class-name> 
    </map-store> 
    <time-to-live-seconds>300</time-to-live-seconds> 
</map> 
... 

CustomerProjectData裝載機類簡單地將數據裝載到地圖從數據庫之一。所以,現在我不再需要DeltaParallelizerEagerDeltaTask

不同的方法,也歡迎:)