0
的原因

我有一個困難時期試圖找出我不斷看到的原因:想不通StaleObjectStateException

`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)` 

我有一個使用Quartz調度消防工作服務,在我的背景下,這些作業被稱爲Flows,每個流程可能由幾個Tasks組成,流程和任務爲Executables,關於其實際Executions的信息存儲爲FlowExecutionsTaskExecutions。該服務使用FlowService啓動流程。

UPD:有一個石英工作,「ExecutorJob」負責射擊我的流量/任務。當它被觸發時,它使用FlowService來啓動它所應該做的任何事情。所以我想知道是否有可能石英線程不會在每次使用服務時創建新的休眠會話,這是問題的原因。我沒有改變FlowService的範圍,所以它是一個單例,GORM如何管理它使用的會話?

UPD2:嘗試使用ExecutorJob上的persistenceContextInterceptor來確保每次使用該服務都使用新會話,但它沒有解決問題。添加了ExecutorJob的簡化代碼。

我無法在本地重現問題,但它在生產中經常發生,更具體地說,當有大量流程啓動時。 我試過同步execute方法的任務和流程,但它沒有奏效,我會嘗試使用悲觀鎖現在,但我的猜測是,它不會解決問題,因爲檢查應用程序日誌它似乎在那裏不是兩個線程更新同一行。以下我試圖展示一個模擬項目結構的簡化版本。

// ------------------ 
// DOMAIN CLASSES 
// ------------------ 
abstract class Executable { 
    static hasMany = [flowTasks: FlowTask] 
    static transients = ['executions'] 

    List<Execution> getExecutions() { 
     this.id ? Execution.findAllByExecutable(this) : [] 
    } 

    void addToExecutions(Execution execution) { 
     execution.executable = this 
     execution.save() 
    } 

    abstract List<Execution> execute(Map params) 
} 

class Flow extends Executable { 
    SortedSet<FlowTask> tasks 
    static hasMany = [tasks: FlowTask] 

    private static final Object lockExecute = new Object() 
    private static final Object lockExecuteTask = new Object() 

    List<FlowExecution> execute(Map params) { 
     synchronized (lockExecute) { 
      List<Map> multiParams = multiplyParams(params) 
      multiParams.collect { Map param -> 
       FlowExecution flowExecution = new FlowExecution() 
       addToExecutions(flowExecution) 
       flowExecution.save() 
       this.attach() 
       save() 
       executeTasks(firstTasks(param), flowExecution, param) 
      } 
     } 
    } 

    List<Map> multiplyParams(Map params) { 
     // creates a list of params for the executions that must be started 
     [params] 
    } 

    Set<FlowTask> firstTasks(Map params) { 
     // finds the first tasks to be executed for the flow 
     tasks.findAdll { true } 
    } 

    private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) { 
     synchronized (lockExecuteTask) { 
      tasks.each { FlowTask flowTask -> 
       try { 
        List<Execution> executions = flowTask.execute(params) 
        executions.each { Execution execution -> 
         flowExecution.addToExecutions(execution) 
        } 
        flowExecution.attach() 
       } catch { 
        // log error executing task 
        throw e 
       }    
      } 

      this.attach() 
      try { 
       save(flush: true) 
      } catch (HibernateOptimisticLockingFailureException e) { 
       // log error saving flow 
       throw e 
      } 

      flowExecution 
     } 
    } 

} 

class Task extends Executable { 
    private static final Object lockExecute = new Object() 
    private static final Object lockGetExecution = new Object() 

    TaskExecution execute(TaskExecution execution) { 
     taskService.start(execution) 
     execution 
    } 

    List<TaskExecution> execute(Map params) { 
     synchronized (lockExecute) { 
      List<Map> multiExecParams = multiplyParams(params) 
      multiExecParams.collect { Map param -> 
       TaskExecution execution = getExecution(param) 
       execute(execution) 
      } 
     } 
    } 

    TaskExecution getExecution(Map params) { 
     synchronized (lockGetExecution) { 
      TaskExecution execution = new TaskExecution(executable: this) 
      execution.setParameters(params) 
      addToExecutions(execution) 

      execution.attach() 
      execution.flowExecution?.attach() 
      this.attach() 
      try { 
       save(flush: true) 
      } catch (HibernateOptimisticLockingFailureException e) { 
       // log error saving task 
       throw e 
      } 

      execution 
     } 
    } 

    List<Map> multiplyParams(Map params) { 
     // creates a list of params for the tasks that must be started 
     [params] 
    } 

} 

class FlowTask { 
    static belongsTo = [flow: Flow, executable: Executable] 

    List<Execution> execute(Map params) { 
     executable.execute(params) 
    } 
} 

abstract class Execution { 
    Map parameterData = [:] 
    static belongsTo = [executable: Executable, flowExecution: FlowExecution] 
    static transients = ['parameters', 'taskExecutions'] 
    void setParameters(Map params) { 
     params.each { key, value -> 
      parameterData[key] = JsonParser.toJson(value) 
     } 
    } 
} 

class TaskExecution extends Execution { 
} 

class FlowExecution extends Execution { 
    List<Execution> executions 
    static transients = ['executions'] 

    FlowExecution() { 
     executions = [] 
    } 

    Set<TaskExecution> getTaskExecutions() { 
     executions?.collect { Execution execution -> 
      return execution.taskExecution 
     }?.flatten()?.toSet() 
    } 

    void addToExecutions(Execution execution){ 
     executions.add(execution) 
     execution.flowExecution = this 
     execution.save() 
    } 

    def onLoad() { 
     try { 
      executions = this.id ? Execution.findAllByFlowExecution(this) : [] 
     } catch (Exception e){ 
      log.error(e) 
      [] 
     } 
    } 
} 

// ----------------- 
// SERVICE CLASSES 
// ----------------- 
class FlowService { 

    Map start(long flowId, Map params) { 
     Flow flow = Flow.lock(flowId) 

     startFlow(flow, params) 
    } 

    private Map startFlow(Flow flow, Map params) { 
     List<RunningFlow> runningFlows = flow.execute(params) 

     [data: [success: true], status: HTTP_OK] 
    } 
} 

//-------------------------------------- 
// Quartz job 
//-------------------------------------- 
class ExecutorJob implements InterruptableJob { 

    def grailsApplication = Holders.getGrailsApplication() 

    static triggers = {} 

    private Thread thread 

    void execute(JobExecutionContext context) throws JobExecutionException { 
     thread = Thread.currentThread() 
     synchronized (LockContainer.taskLock) { 
      Map params = context.mergedJobDataMap 
      def persistenceInterceptor = persistenceInterceptorInstance 

      try { 
       persistenceInterceptor.init() 

       Long executableId = params.executableId as Long 

       def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance 
       service.start(executableId, params) 
      } catch (Exception e) { 
       // log error 
      } finally { 
       persistenceInterceptor.flush() 
       persistenceInterceptor.destroy() 
      } 
     } 
    } 

    PersistenceContextInterceptor getPersistenceInterceptorInstance() { 
     grailsApplication.mainContext.getBean('persistenceInterceptor') 
    } 

    FluxoService getFlowServiceInstance() { 
     grailsApplication.mainContext.getBean('flowService') 
    } 

    TarefaService getTaskServiceInstance() { 
     grailsApplication.mainContext.getBean('taskService') 
    } 

    @Override 
    void interrupt() throws UnableToInterruptJobException { 
     thread?.interrupt() 
    }  
} 

任何人都知道的東西,可以幫助?

+0

哪裏是檢索流程實例,並調用Flow.execute()的代碼? –

+0

這是FlowService的啓動方法。 –

+0

優秀。這是我需要看到的代碼。 –

回答

0

那麼,很難理解發生了什麼問題。但是,我猜這個錯誤是在會話中有一個對象已經被其他事務保存或更新時引發的。同樣,當hibernate試圖保存這個對象時,它給出了行被更新錯誤的另一個事務錯誤。

我想你可以在保存你的對象之前嘗試刷新,看看它是怎麼回事。

http://grails.github.io/grails-doc/2.3.4/ref/Domain%20Classes/refresh.html

def b = Book.get(1) 
… 
b.refresh()