我有一個困難時期試圖找出我不斷看到的原因:想不通StaleObjectStateException
`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)`
我有一個使用Quartz調度消防工作服務,在我的背景下,這些作業被稱爲Flows
,每個流程可能由幾個Tasks
組成,流程和任務爲Executables
,關於其實際Executions
的信息存儲爲FlowExecutions
和TaskExecutions
。該服務使用FlowService
啓動流程。
UPD:有一個石英工作,「ExecutorJob」負責射擊我的流量/任務。當它被觸發時,它使用FlowService來啓動它所應該做的任何事情。所以我想知道是否有可能石英線程不會在每次使用服務時創建新的休眠會話,這是問題的原因。我沒有改變FlowService的範圍,所以它是一個單例,GORM如何管理它使用的會話?
UPD2:嘗試使用ExecutorJob上的persistenceContextInterceptor來確保每次使用該服務都使用新會話,但它沒有解決問題。添加了ExecutorJob的簡化代碼。
我無法在本地重現問題,但它在生產中經常發生,更具體地說,當有大量流程啓動時。 我試過同步execute
方法的任務和流程,但它沒有奏效,我會嘗試使用悲觀鎖現在,但我的猜測是,它不會解決問題,因爲檢查應用程序日誌它似乎在那裏不是兩個線程更新同一行。以下我試圖展示一個模擬項目結構的簡化版本。
// ------------------
// DOMAIN CLASSES
// ------------------
abstract class Executable {
static hasMany = [flowTasks: FlowTask]
static transients = ['executions']
List<Execution> getExecutions() {
this.id ? Execution.findAllByExecutable(this) : []
}
void addToExecutions(Execution execution) {
execution.executable = this
execution.save()
}
abstract List<Execution> execute(Map params)
}
class Flow extends Executable {
SortedSet<FlowTask> tasks
static hasMany = [tasks: FlowTask]
private static final Object lockExecute = new Object()
private static final Object lockExecuteTask = new Object()
List<FlowExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiParams = multiplyParams(params)
multiParams.collect { Map param ->
FlowExecution flowExecution = new FlowExecution()
addToExecutions(flowExecution)
flowExecution.save()
this.attach()
save()
executeTasks(firstTasks(param), flowExecution, param)
}
}
}
List<Map> multiplyParams(Map params) {
// creates a list of params for the executions that must be started
[params]
}
Set<FlowTask> firstTasks(Map params) {
// finds the first tasks to be executed for the flow
tasks.findAdll { true }
}
private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) {
synchronized (lockExecuteTask) {
tasks.each { FlowTask flowTask ->
try {
List<Execution> executions = flowTask.execute(params)
executions.each { Execution execution ->
flowExecution.addToExecutions(execution)
}
flowExecution.attach()
} catch {
// log error executing task
throw e
}
}
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving flow
throw e
}
flowExecution
}
}
}
class Task extends Executable {
private static final Object lockExecute = new Object()
private static final Object lockGetExecution = new Object()
TaskExecution execute(TaskExecution execution) {
taskService.start(execution)
execution
}
List<TaskExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiExecParams = multiplyParams(params)
multiExecParams.collect { Map param ->
TaskExecution execution = getExecution(param)
execute(execution)
}
}
}
TaskExecution getExecution(Map params) {
synchronized (lockGetExecution) {
TaskExecution execution = new TaskExecution(executable: this)
execution.setParameters(params)
addToExecutions(execution)
execution.attach()
execution.flowExecution?.attach()
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving task
throw e
}
execution
}
}
List<Map> multiplyParams(Map params) {
// creates a list of params for the tasks that must be started
[params]
}
}
class FlowTask {
static belongsTo = [flow: Flow, executable: Executable]
List<Execution> execute(Map params) {
executable.execute(params)
}
}
abstract class Execution {
Map parameterData = [:]
static belongsTo = [executable: Executable, flowExecution: FlowExecution]
static transients = ['parameters', 'taskExecutions']
void setParameters(Map params) {
params.each { key, value ->
parameterData[key] = JsonParser.toJson(value)
}
}
}
class TaskExecution extends Execution {
}
class FlowExecution extends Execution {
List<Execution> executions
static transients = ['executions']
FlowExecution() {
executions = []
}
Set<TaskExecution> getTaskExecutions() {
executions?.collect { Execution execution ->
return execution.taskExecution
}?.flatten()?.toSet()
}
void addToExecutions(Execution execution){
executions.add(execution)
execution.flowExecution = this
execution.save()
}
def onLoad() {
try {
executions = this.id ? Execution.findAllByFlowExecution(this) : []
} catch (Exception e){
log.error(e)
[]
}
}
}
// -----------------
// SERVICE CLASSES
// -----------------
class FlowService {
Map start(long flowId, Map params) {
Flow flow = Flow.lock(flowId)
startFlow(flow, params)
}
private Map startFlow(Flow flow, Map params) {
List<RunningFlow> runningFlows = flow.execute(params)
[data: [success: true], status: HTTP_OK]
}
}
//--------------------------------------
// Quartz job
//--------------------------------------
class ExecutorJob implements InterruptableJob {
def grailsApplication = Holders.getGrailsApplication()
static triggers = {}
private Thread thread
void execute(JobExecutionContext context) throws JobExecutionException {
thread = Thread.currentThread()
synchronized (LockContainer.taskLock) {
Map params = context.mergedJobDataMap
def persistenceInterceptor = persistenceInterceptorInstance
try {
persistenceInterceptor.init()
Long executableId = params.executableId as Long
def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance
service.start(executableId, params)
} catch (Exception e) {
// log error
} finally {
persistenceInterceptor.flush()
persistenceInterceptor.destroy()
}
}
}
PersistenceContextInterceptor getPersistenceInterceptorInstance() {
grailsApplication.mainContext.getBean('persistenceInterceptor')
}
FluxoService getFlowServiceInstance() {
grailsApplication.mainContext.getBean('flowService')
}
TarefaService getTaskServiceInstance() {
grailsApplication.mainContext.getBean('taskService')
}
@Override
void interrupt() throws UnableToInterruptJobException {
thread?.interrupt()
}
}
任何人都知道的東西,可以幫助?
哪裏是檢索流程實例,並調用Flow.execute()的代碼? –
這是FlowService的啓動方法。 –
優秀。這是我需要看到的代碼。 –