2016-11-05 70 views
0

我有一個hibernate類,需要3個不同的會話。它目前使用2個會話並且完美地工作。第一個會話用於從外部數據庫讀取數據。第二個會話用於將數據保存到我們的內部數據庫。我添加了第三個會話,因爲我們需要跟蹤事務,而不管主事務是否成功(XXXXUpdate對象)。我的問題是新會話掛在tx.commit()上。休眠掛在tx.commit

private synchronized void executeUpdate(Long manualUpdateTagIndex) throws Exception { 
    LogPersistenceLoggingContext ctx = new LogPersistenceThreadContext().getLogPersistenceLoggingContext(); 

    DateTime minTriggerDate = parseDateTimeIfNotNull(minTriggerTime); 
    DateTime maxTriggerDate = parseDateTimeIfNotNull(maxTriggerTime); 
    Session webdataSession = null; 
    Session XXXXUpdateSession = null; 
    XXXXUpdate update = new XXXXUpdate(); 
    update.setExecutedAt(new DateTime()); 
    update.setStatus(WebdataUpdateStatus.Success); 

    boolean commit = true; 
    int tagCount = 0; 
    List<Period> tagPeriods = new ArrayList<>(); 
    Map<Long, DateTime> tagIndexes = new LinkedHashMap<>(); 

    try { 

     XXXXUpdateSession = accountingService.openUnmanagedSession(); 
     XXXXUpdateSession.getTransaction().begin(); 
     XXXXUpdateSession.save(update); 

     HierarchicalLogContext logCtx = new HierarchicalLogContext(String.valueOf(update.getId())); 
     ctx.pushLoggingContext(logCtx); 

     ctx.log(logger, Level.INFO, new XXXXLogMarker(), "Executing XXXX data transfer", new Object[]{}); 
     if (webdataSessionFactory == null){ 
      throw new Exception("Failed to obtain webdata session factory. See earlier log entries"); 
     } 
     try { 
      webdataSession = webdataSessionFactory.openSession(); 
     } catch (Exception ex) { 
      update.setStatus(WebdataUpdateStatus.ConnectionError); 
      throw new Exception("Failed to obtain webdata connection", ex); 
     } 

     webdataSession.getTransaction().begin(); 

     if (manualUpdateTagIndex == null) { // automatic tags update 

      XXXXUpdate lastUpdate = (XXXXUpdate) HibernateUtil.getCurrentSpringManagedSession() 
        .createCriteria(XXXXUpdate.class) 
        .add(Restrictions.isNotNull("latestTriggerTimestamp")) 
        .add(Restrictions.eq("status", WebdataUpdateStatus.Success)) 
        .add(Restrictions.eq("manualUpdate", false)) 
        .addOrder(Order.desc("latestTriggerTimestamp")) 
        .setMaxResults(1).uniqueResult(); 

      DateTime lastUpdatedDate = Period.defaultEffectiveInstant; 
      if (minTriggerDate != null) { 
       lastUpdatedDate = minTriggerDate; 
      } 

      if (lastUpdate != null && lastUpdate.getLatestTriggerTimestamp() != null) { 
       lastUpdatedDate = lastUpdate.getLatestTriggerTimestamp(); 
       ctx.log(logger, Level.INFO, new XXXXLogMarker(), 
         "Querying for tag event triggers newer than last update timestamp [" + lastUpdate.getLatestTriggerTimestamp() + "]", new Object[]{}); 
      } else { 
       ctx.log(logger, Level.INFO, new XXXXLogMarker(), "Update has never run. Catching up with history", new Object[]{}); 
      } 

      @SuppressWarnings("unchecked") 
      List<XXXXProcessedTagRequest> processedReqs = HibernateUtil.getCurrentSpringManagedSession() 
        .createCriteria(XXXXProcessedTagRequest.class).list(); 

      Query triggerQuery = webdataSession.createQuery(
        "select trigger, " 
          + "trigger.TagIndex," 
          + "req " 
          + "from XXXXTagEventTrigger as trigger " 
          + "join trigger.req as req " 
          + "where trigger.EventType in (:eventTypes) " 
          + "and trigger.timestamp > :lastUpdateMinusDelta " 
          + (maxTriggerDate != null?"and trigger.timestamp < :maxDate ":"") 
          + "and req.CurrentState = :currentState " 
          + "order by trigger.timestamp,trigger.reqIndex"); 

      triggerQuery.setParameterList("eventTypes", new Object[]{5, 9}); 
      triggerQuery.setParameter("lastUpdateMinusDelta", lastUpdatedDate.minusHours(hoursToKeepProcessedReqs)); 
      if (maxTriggerDate != null){ 
       triggerQuery.setParameter("maxDate", maxTriggerDate); 
      } 
      triggerQuery.setParameter("currentState", 2); 

      @SuppressWarnings("unchecked") 
      List<Object[]> allTriggers = triggerQuery.list(); 

      List<Object[]> unprocessedTriggers = removeProcessedTags(new ArrayList<Object[]>(allTriggers),processedReqs,ctx); 

      for (Object[] row : unprocessedTriggers) { 
       XXXXTagEventTrigger trigger = (XXXXTagEventTrigger) row[0]; 

       if (lastUpdatedDate == null || lastUpdatedDate.isBefore(trigger.getTimestamp().getMillis())) { 
        lastUpdatedDate = new DateTime(trigger.getTimestamp()); 
       } 

       tagIndexes.put((Long) row[1], new DateTime(trigger.getTimestamp())); 

       XXXXProcessedTagRequest processedReq = new XXXXProcessedTagRequest(); 
       processedReq.setReqIndex(((XXXXTagReq)row[2]).getReqIndex()); 
       processedReq.setTimestamp(trigger.getTimestamp()); 

       HibernateUtil.getCurrentSpringManagedSession().save(processedReq); 
      } 

      ctx.log(logger, Level.INFO, new XXXXLogMarker(), 
        "Found [" + unprocessedTriggers.size() + "] tag event triggers on [" + tagIndexes.size() + "] tags", new Object[]{}); 

      update.setLatestTriggerTimestamp(lastUpdatedDate); 
     } else { // manual tag update 
      ctx.log(logger, Level.INFO, new XXXXLogMarker(), "Executing manual update for tag index [" + manualUpdateTagIndex + "]", new Object[]{}); 

      DateTime now = new DateTime(); 
      tagIndexes.put(manualUpdateTagIndex, now); 
      update.setLatestTriggerTimestamp(now); 
      update.setManualUpdate(true); 
     } 

     if (tagIndexes.size() > 0) { 

      int totalTagCount = tagIndexes.size(); 

      while (!tagIndexes.isEmpty()) { 
       List<Long> batchIndexes = new ArrayList<>(); 
       Iterator<Map.Entry<Long, DateTime>> indexIt = tagIndexes.entrySet().iterator(); 

       while (indexIt.hasNext() && batchIndexes.size() < tagBatchSize) { 
        batchIndexes.add(indexIt.next().getKey()); 
        indexIt.remove(); 
       } 

       Map<Long, LocalTag> existingTags = new HashMap<>(); 
       @SuppressWarnings("unchecked") 
       List<LocalTag> existingTagIds = HibernateUtil.getCurrentSpringManagedSession() 
         .createCriteria(LocalTag.class) 
         .add(Restrictions.in("tagIndex", batchIndexes)) 
         .add(Restrictions.eq("currentVersion", true)).list(); 

       for (LocalTag lt : existingTagIds) { 
        existingTags.put(lt.getTagIndex(), lt); 
       } 

       ctx.log(logger, Level.INFO, new XXXXLogMarker(), 
         "Processing tag updates [" + tagCount + "-" + (tagCount + batchIndexes.size()) + "] of [" + totalTagCount + "]", new Object[]{}); 

       Criteria tagCriteria = webdataSession.createCriteria(XXXXTag.class); 
       tagCriteria.add(Restrictions.in("TagIndex", batchIndexes)); 
       if (!includeTestTags) { 
        tagCriteria.add(Restrictions.eq("TestTag", "0")); 
       } 
       tagCriteria.setFetchMode("XXXXTagMS", FetchMode.JOIN); 
       tagCriteria.setFetchMode("XXXXTagPS", FetchMode.JOIN); 
       tagCriteria.setFetchMode("XXXXTagCCList", FetchMode.JOIN); 
       tagCriteria.setFetchMode("XXXXTagTA", FetchMode.JOIN); 
       tagCriteria.setFetchMode("XXXXTagCP", FetchMode.JOIN); 
       tagCriteria.setResultTransformer(CriteriaSpecification.DISTINCT_ROOT_ENTITY); 

       @SuppressWarnings("unchecked") 
       List<XXXXTag> tags = tagCriteria.list(); 

       if (manualUpdateTagIndex != null && tags.isEmpty()) { 
        throw new ValidationException("No tag found for manual update tag index [" + manualUpdateTagIndex + "]"); 
       } 

       for (XXXXTag tag : tags) { 
        update.getProcessedTags().add(updateTag(tag, tagIndexes.get(tag.getTagIndex()), existingTags)); 
        tagCount++; 
        if (fireEventLastActions.contains(tag.getLastAction().trim())) { 
         tagPeriods.add(new Period(tag.getStartTime().getMillis(), tag.getStopTime().getMillis())); 
        } 
       } 

       HibernateUtil.getCurrentSpringManagedSession().flush(); 
       HibernateUtil.getCurrentSpringManagedSession().clear(); 

       webdataSession.clear(); 
      } 
     } else { 
      ctx.log(logger, Level.INFO, new XXXXLogMarker(), "No updates found", new Object[]{}); 
     } 

     HibernateUtil.getCurrentSpringManagedSession() 
     .createQuery("delete XXXXUpdate where executedAt < :purgeDate") 
     .setParameter("purgeDate", new DateTime().minusDays(daysToKeepUpdateHistory)) 
     .executeUpdate(); 

     HibernateUtil.getCurrentSpringManagedSession() 
     .createQuery("delete XXXXProcessedTagRequest where timestamp < :purgeDate") 
     .setParameter("purgeDate", new DateTime().minusHours(hoursToKeepProcessedReqs)) 
     .executeUpdate(); 

     update.setStatus(WebdataUpdateStatus.Success); 
     update.setTagCount(update.getProcessedTags().size()); 

     tagPeriods = Period.merge(tagPeriods); 

     for (Period p : tagPeriods) { 
      XXXXUpdatePeriod oup = new XXXXUpdatePeriod(); 
      oup.setXXXXUpdate(update); 
      oup.setStartDate(p.getStart()); 
      oup.setEndDate(p.getEnd()); 
      update.getPeriods().add(oup); 
     } 

     HibernateUtil.getCurrentSpringManagedSession().flush(); 

     ctx.log(logger, Level.INFO, new XXXXLogMarker(), "XXXX data transfer complete. Transferred [" + tagCount + "] tag updates", new Object[]{}); 

     ctx.popLoggingContext(logCtx); 
    } catch (Exception ex) { 
     HibernateUtil.getCurrentSpringManagedSession().clear(); 
     update.getProcessedTags().clear(); 
     update.setTagCount(0); 
     update.setStatus(WebdataUpdateStatus.TransferError); 
     commit = false; 
     ctx.log(logger, Level.ERROR, new XXXXLogMarker(), "XXXX data transfer failed", new Object[]{}, ex); 
     throw new Exception("XXXX data transfer failed", ex); 
    } finally { 
     try { 

      XXXXUpdateSession.saveOrUpdate(update); 
      XXXXUpdateSession.getTransaction().commit(); 

     } catch (Exception ex) { 
      commit = false; 
      ctx.log(logger, Level.ERROR, new XXXXLogMarker(), "Failed to save XXXX transfer update record", new Object[]{}, ex); 
      throw new Exception("Failed to save XXXX transfer update record", ex); 
     } finally { 
      if (!commit) { 
       webdataSession.getTransaction().rollback(); 
      } else { 
       webdataSession.getTransaction().commit(); 
      } 
      ResourceDisposer.dispose(webdataSession); 
     } 

    } 

} 

新會話是XXXXUpdateSession。唯一的新代碼是與本次會話相關的代碼。這是一種計時問題,因爲當我使用hibernate調試日誌時,tx提交沒有問題。當我嘗試調試hibernate commit()時,它也會提交。我沒有太多的休眠經驗,所以我可能錯過了一些明顯的東西。任何幫助將不勝感激。謝謝。

回答

1

您已經打開兩個交易webdataSession.getTransaction().begin();這是造成問題(20 &上述代碼中的37行)。

您可以在提交第一筆交易後打開第二筆交易。

此外,長時間使用這種方法並不是一種最佳實踐,因爲這些方法很難調試問題併成爲項目維護/支持的噩夢。

+0

謝謝,我同意。我沒有寫。那麼,即使這兩個交易與不同的會話相關聯,它們也不能同時打開? – user3029642

+0

你在哪裏刷新了XXXXUpdateSession? – developer

+0

我在嵌套finally中的方法結束時犯了一個錯誤。 XXXXUpdateSession.saveOrUpdate(update); XXXXUpdateSession.getTransaction()。commit(); 我也嘗試在同一個地方沖洗。同樣的結果。 – user3029642