2012-04-11 77 views
0

雖然在處理程序中沒有引發異常,但我得到了NServiceBus重試消息X次的奇怪問題。有一些信息處理NHibernate會話和NSB環境事務。由於沒有錯誤發生,我不是100%確定的問題,因此不能真正決定做什麼。即使沒有異常,NServiceBus重試消息

我被配置NSB與城堡溫莎像這樣:

IWindsorContainer container = new WindsorContainer(new XmlInterpreter()); 
container.Install(new ContainerInstaller()); 
container.Install(new UnitOfWorkInstaller(AppDomain.CurrentDomain.BaseDirectory, Castle.Core.LifestyleType.Scoped)); 
container.Install(new FactoryInstaller(AppDomain.CurrentDomain.BaseDirectory)); 
container.Install(new RepositoryInstaller(AppDomain.CurrentDomain.BaseDirectory)); 

Configure.With() 
    .CastleWindsorBuilder(container) 
    .FileShareDataBus(Properties.Settings.Default.DataBusFileSharePath) 
    .MsmqTransport() 
     .IsTransactional(true) 
     .PurgeOnStartup(false) 
    .UnicastBus() 
     .LoadMessageHandlers() 
     .ImpersonateSender(false) 
    .JsonSerializer(); 

UnitOfWorkInstaller寄存器工作(NHibernate會話)的單位像這樣:

public void Install(IWindsorContainer container, IConfigurationStore store) 
{ 
    var fromAssemblyDescriptor = AllTypes.FromAssemblyInDirectory(new AssemblyFilter(_installationPath)); 
    container.Register(fromAssemblyDescriptor 
     .IncludeNonPublicTypes() 
     .Pick() 
     .If(t => t.GetInterfaces().Any(i => i == typeof(IUnitOfWork)) && t.Namespace.StartsWith("Magma")) 
     .WithService.AllInterfaces() 
     .Configure(con => con.LifeStyle.Is(_lifeStyleType).UsingFactoryMethod(k => k.Resolve<IUnitOfWorkFactory>().Create()))); 
} 

因此,每個時間的消息到達時所有存儲庫都使用相同的工作單元。我讀過手動回滾當前事務結果的錯誤(我真的不知道爲什麼),我也知道NSB爲每個傳輸消息創建一個子容器,並且此子容器在處理消息後處理。問題是,當孩子容器佈置工作單元配置是這樣的:

public void Dispose() 
    { 
     if (!_isDisposed) 
     { 
      DiscardSession(); 
      _isDisposed = true; 
     } 
    } 

    private void DiscardSession() 
    { 
     if (_transaction != null && _transaction.IsActive) 
     { 
      _transaction.Dispose(); 
     } 
     if (Session != null) 
     { 
      Session.Dispose(); 
     } 
    } 

我的處理程序結構是這樣的:(該_unitOfWork作爲一個構造函數依賴通過)

public void Handle(<MessageType> message) 
    { 
     using (_unitOfWork) 
     { 
      try 
      { 
       // do stuff 
       _unitOfWork.Commit(); 
      } 
      catch (Exception ex) 
      { 
       _unitOfWork.Rollback(); 

       // rethrow so the message stays in the queue 
       throw; 
      } 
     } 
    } 

我發現,如果我沒有提交工作單元(刷新會話並提交交易),我得到一個錯誤,表示郵件已重試超過最大重試次數bla bla bla ...

因此,它似乎與NHibernate會話和它的創建和處理方式,但是由於它是在工作單元內創建的,所以我無法真正使用會話工廠。我讀過我可以使用IMessageModule來創建和處理會話,但我不知道這是否正確,因爲我不明白是什麼導致了錯誤。

因此,要回顧:

  • 我使用的工作範圍的單位,使所有的處理程序相依性將共享相同的實例(THX到子容器,BTW:我設置作爲臨時思考的工作單元,子容器將所有瞬態對象視爲該容器內的單例,但我看到工作單元未被共享,因此這就是爲什麼它的設置範圍)

  • 我是將我的處理程序包裝在using(_unitOfWork) { }聲明中,以便在每次處理後處理工作單元。

  • 當下班的單位設置,NHibernate會話也設置

  • 如果我不顯式調用Commit_unitOfWork,消息重試超出最大重試次數,然後拋出一個錯誤。

是什麼導致了這種行爲?而IMessageModule就是這個答案嗎?

+0

查看錯誤隊列中的消息。消息頭中是否有異常消息? – stephenl 2012-04-12 00:30:01

+0

@stephenl Nope,我看到的唯一信息是與''相同的信息。它似乎與NHibernate有關,但我沒有任何與NHibernate的NSB集成。 – 2012-04-12 12:19:44

回答

0

我想我縮小了一下...我刪除了所有的using(_unitOfWork)_unitOfWork.Commit()_unitOfWork.Rollback(),並讓NSB TransactionScope完成提交或回滾事務的工作,因爲NHibernate的會話正在參與NSB事務範圍。

我也開始使用NHibernate會話的事務(Session.Transaction),而不是通過Session.BeginTransaction()獲取對它的引用並使用它。我複製/粘貼了我的UoW實現,以便您可以看到差異(舊代碼在註釋中)。

我不知道我的更改是否解釋了什麼,但使用Session的事務並刪除沖洗,因爲它正在處理事務中提交似乎解決了問題......我不必顯式調用爲了使消息被成功處理,使用了Commit方法。這是我的UoW執行:

public class NHibernateUnitOfWork : INHibernateUnitOfWork 
{ 
    //private ITransaction _transaction; 
    private bool _isDisposed; 
    private bool _isInError; 

    public ISession Session { get; protected set; } 

    public NHibernateUnitOfWork(ISession session) 
    { 
     Contract.Requires(session != null, "session"); 
     Session = session; 

     //_transaction = Session.BeginTransaction(); 

     // create a new transaction as soon as the session is available 
     Session.BeginTransaction(); 
     _isDisposed = false; 
     _isInError = false; 
    } 

    public void MarkCreated(Object entity) 
    { 
     // assert stuff 

     try 
     { 
      Session.SaveOrUpdate(entity); 
      //Session.Flush(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void MarkUpdated(Object entity) 
    { 
     // assert stuff 

     try 
     { 
      Session.Update(entity); 
      //Session.Flush(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void MarkSavedOrUpdated(Object entity) 
    { 
     // assert stuff 

     try 
     { 
      Session.SaveOrUpdate(entity); 
      //Session.Flush(); 
     } 
     catch (HibernateException) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void MarkDeleted(Object entity) 
    { 
     // assert stuff 

     try 
     { 
      Session.Delete(entity); 
      //Session.Flush(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void Commit() 
    { 
     // assert stuff 

     try 
     { 
      //Session.Flush(); 
      //_transaction.Commit(); 
      Session.Transaction.Commit(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void Rollback() 
    { 
     // assert stuff 

     try 
     { 
      //if (!_transaction.WasRolledBack) 
      //{ 
      // _transaction.Rollback(); 
      //} 
      Session.Transaction.Rollback(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void Dispose() 
    { 
     if (!_isDisposed) 
     { 
      DiscardSession(); 
      _isDisposed = true; 
     } 
    } 

    private void DiscardSession() 
    { 
     //if (_transaction != null && _transaction.IsActive) 
     //{ 
     // _transaction.Dispose(); 
     //} 
     if (Session != null) 
     { 
      try 
      { 
       // rollback all uncommitted changes 
       if (Session.Transaction != null && Session.Transaction.IsActive) 
       { 
        Session.Transaction.Rollback(); 
       } 
       //Session.Clear(); 
       Session.Close(); 
      } 
      catch (Exception) 
      { } 
      finally 
      { 
       Session.Dispose(); 
      } 
     } 
    } 

    private void HandleError() 
    { 
     _isInError = true; 
     //if (_transaction != null && _transaction.IsActive) 
     //{ 
     // _transaction.Rollback(); 
     //} 
     if (Session.Transaction != null && Session.Transaction.IsActive) 
     { 
      Session.Transaction.Rollback(); 
     } 
    } 

    // assert methods 
} 

這是否有任何意義?我仍然不知道是什麼造成了錯誤,但它似乎與事務處理範圍完成之前處理NHibernate會話有關。

+0

我只注意到它只在UoW設置爲瞬態時才起作用。如果它被設置爲作用域,則不起作用。我認爲Castle Windsor支持兒童容器,但它似乎不起作用......如果我的處理程序有兩個依賴關係,一個是UoW本身,另一個是也對UoW有依賴關係的存儲庫,他們贏了不一樣。 – 2012-04-12 16:02:39