我使用Hazelcast 2.0.1頻繁更新數據(一些2分鐘),其中包括首先刪除數據,然後從數據庫加載數據。然而,線程中的某個線程會對某個鍵進行鎖定,從而防止刪除操作並引發異常(java.util.ConcurrentModificationException: Another thread holds a lock for the key: [email protected]
)。請幫助我,我的地圖在hazelcast中得到更新。如何頻繁更新Hazelcast地圖
我給下面
我的代碼DeltaParallelizer
def customerDetails = dataOperations.getDistributedStore(DataStructures.customer_project.name()).keySet()
ExecutorService service = Hazelcast.getExecutorService()
def result
try{
customerDetails?.each{customerEmail->
log.info String.format('Creating delta task for customer:%s',customerEmail)
def dTask = new DistributedTask(new EagerDeltaTask(customerEmail))
service.submit(dTask);
}
customerDetails?.each {customerEmail ->
log.info String.format('Creating task customer aggregation for %s',customerEmail)
def task = new DistributedTask(new EagerCustomerAggregationTask(customerEmail))
service.submit(task)
}
}
catch(Exception e){
e.printStackTrace()
}
EagerDeltaTask
class EagerDeltaTask implements Callable,Serializable {
private final def emailId
EagerDeltaTask(email){
emailId = email
}
@Override
public Object call() throws Exception {
log.info(String.format("Eagerly computing delta for %s",emailId))
def dataOperations = new DataOperator()
def tx = Hazelcast.getTransaction()
tx.begin()
try{
deleteAll(dataOperations)
loadAll(dataOperations)
tx.commit()
}
catch(Exception e){
tx.rollback()
log.error(String.format('Delta computation is screwed while loading data for the project:%s',emailId),e)
}
}
private void deleteAll(dataOperations){
log.info String.format('Deleting entries for customer %s',emailId)
def projects = dataOperations.getDistributedStore(DataStructures.customer_project.name()).get(emailId)
projects?.each{project->
log.info String.format('Deleting entries for project %s',project[DataConstants.PROJECT_NUM.name()])
def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()])?.collect{it[DataConstants.SR_NUM.name()]}
def activitiesStore = dataOperations.getDistributedStore(DataStructures.sr_activities.name())
srs?.each{sr ->
activitiesStore.remove(sr)
}
dataOperations.getDistributedStore(DataStructures.project_sr_aggregation.name()).remove(project[DataConstants.PROJECT_NUM.name()])
}
dataOperations.getDistributedStore(DataStructures.customer_project.name()).remove(emailId)
}
private void loadAll(dataOperations){
log.info(String.format('Loading entries for customer %s',emailId))
def projects = dataOperations.projects(emailId)
projects?.each{project->
log.info String.format('Loading entries for project %s',project[DataConstants.PROJECT_NUM.name()])
def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()])
srs?.each{sr->
dataOperations.activities(sr[DataConstants.SR_NUM.name()])
}
}
}
}
DataOperator
class DataOperator {
def getDistributedStore(String name){
Hazelcast.getMap(name)
}
}
我得到例外deleteAll SRS,因此一些地圖內容將被刪除,並且只有內容被刪除且地圖的其餘部分具有舊數據的地圖纔會加載新數據。所以我沒有在我的Hazelcast地圖中獲得更新的數據。請就我如何將更新的數據放入我的Hazelcast地圖建議您的看法。
也是這樣Hazelcast.getTransaction
客戶端爲此目的而工作?
注:客戶可以有很多project_num,1個project_num可以由多個用戶共享太 1 project_num可以有多個SR_NUM