2013-08-25 148 views
3

我是NEventStore和一般事件採購的新手。在一個項目中,我想使用NEventStore來持久化由我們的聚合生成的事件,但是我有一些問題需要正確處理併發。NEventStore樂觀鎖

如何使用樂觀鎖寫入相同的流?

比方說,我有2個實例從2個不同的線程在修訂1加載相同的聚合。然後是第一個線程調用命令A和第二個線程調用命令B.使用樂觀鎖定其中一個聚合應該失敗,併發異常。

我以爲使用maxRevision從加載聚集的角度打開流,但似乎CommitChanges永遠不會失敗,如果我通過舊版本。

我失蹤了什麼?使用NEventStore /事件採購時,樂觀鎖定可能/正確嗎?

這裏是我用來再現該問題的代碼:

namespace NEventStore.Example 
{ 
    using System; 
    using System.Transactions; 
    using NEventStore; 
    using NEventStore.Dispatcher; 
    using NEventStore.Persistence.SqlPersistence.SqlDialects; 

    internal static class MainProgram 
    { 
     private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier 
     private static IStoreEvents store; 

     private static void Main() 
     { 
      using (var scope = new TransactionScope()) 
      using (store = WireupEventStore()) 
      { 
       Client1(revision: 0); 

       Client2(revision: 0); 

       scope.Complete(); 
      } 

      Console.WriteLine(Resources.PressAnyKey); 
      Console.ReadKey(); 
     } 

     private static IStoreEvents WireupEventStore() 
     { 
      return Wireup.Init() 
       .UsingInMemoryPersistence() 
       .Build(); 
     } 

     private static void Client1(int revision) 
     { 
      using (var stream = store.OpenStream(StreamId, 0, revision)) 
      { 
       var @event = new SomeDomainEvent { Value = "Client 1 - event 1." }; 

       stream.Add(new EventMessage { Body = @event }); 


       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 

     private static void Client2(int revision) 
     { 
      using (var stream = store.OpenStream(StreamId, 0, revision)) 
      { 
       var @event = new SomeDomainEvent { Value = "Client 2 - event 1." }; 

       stream.Add(new EventMessage { Body = @event }); 


       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 
    } 
} 

我希望客戶端2失敗,因爲我用舊版本打開流。

UPDATE 26/08/2013: 我已經測試了相同的代碼,使用Sql服務器,似乎按預期工作。

namespace NEventStore.Example 
{ 
    using System; 
    using System.Transactions; 
    using NEventStore; 
    using NEventStore.Dispatcher; 
    using NEventStore.Persistence.SqlPersistence.SqlDialects; 

    internal static class MainProgram 
    { 
     private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier 
     private static IStoreEvents store; 

     private static void Main() 
     { 
      using (store = WireupEventStore()) 
      { 
       OpenOrCreateStream(); 

       AppendToStream_Client1(revision: 1); 

       AppendToStream_Client2(revision: 1); // throws an error 
       // AppendToStream_Client2(revision: 2); // works 
      } 

      Console.WriteLine(Resources.PressAnyKey); 
      Console.ReadKey(); 
     } 

     private static IStoreEvents WireupEventStore() 
     { 
      return Wireup.Init() 
       .LogToOutputWindow() 
       .UsingInMemoryPersistence() 
       .UsingSqlPersistence("EventStore") // Connection string is in app.config 
        .WithDialect(new MsSqlDialect()) 
        .InitializeStorageEngine() 
        .UsingJsonSerialization() 
       .Build(); 
     } 

     private static void OpenOrCreateStream() 
     { 
      using (var stream = store.OpenStream(StreamId, 0, int.MaxValue)) 
      { 
       var @event = new SomeDomainEvent { Value = "Initial event." }; 

       stream.Add(new EventMessage { Body = @event }); 
       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 

     private static void AppendToStream_Client1(int revision) 
     { 
      using (var stream = store.OpenStream(StreamId, int.MinValue, revision)) 
      { 
       var @event = new SomeDomainEvent { Value = "Second event 1." }; 

       stream.Add(new EventMessage { Body = @event }); 
       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 

     private static void AppendToStream_Client2(int revision) 
     { 
      using (var stream = store.OpenStream(StreamId, int.MinValue, revision)) 
      { 
       var @event = new SomeDomainEvent { Value = "Second event 2." }; 

       stream.Add(new EventMessage { Body = @event }); 
       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 
    } 
} 

所以回到我的問題:要啓用樂觀鎖定,我應該使用修訂時打開流?還有其他可能的實現或準則?

感謝

回答

6

首先,在內存中的持久性實現,其主要目的是測試,是不是事務感知。在您的原始示例中,客戶端2將簡單地將其事件附加到流中。嘗試使用支持事務的持久性存儲(SQL & Raven,但不是Mongo)來運行上述操作。

其次,當打開一個流用於不同的目的指定的最小/最大修訂:

  1. 當重新水合的聚集體,並且沒有快照是可用的,應指定(分:0,最大:int.MaxValue),因爲你有興趣檢索所有事件。
  2. 重新提供聚合和快照時,您可以指定(min:snapshot.Version,max:int.MaxValue)來獲取自快照以來發生的所有事件。
  3. 保存聚合時,可以指定(min:0,max:Aggregate.Version)。 Aggregate.Version是在重新水合過程中獲得的。如果同一個骨料在同一時間在其他地方重新補充並保存,則會出現競賽狀況,並且會發生ConcurrencyException

大部分支持將被封裝在域框架中。請參閱普通域中的AggregateBaseEventStoreRepository

第三,最重要的是,在單個事務中更新> 1個流是一種代碼異味。如果您正在執行DDD/ES,則流代表單個聚合根,根據定義,它是一致性邊界。在事務中創建/更新多個AR可以打破這一點。 NEventStore的事務處理支持被(不情願地)添加了,因此它可以與其他工具一起工作,即事務性地從MSMQ/NServiceBus/whatever讀取命令並處理它,或者事務性地將提交消息分派給隊列並將其標記爲這樣。就個人而言,我會建議你儘量避免2PC。

+0

謝謝達米安。但我不確定要理解。假設我刪除了TransactionScope。有可能處理樂觀鎖?怎麼樣?基本上我只想在這段時間內沒有其他事件被提交時寫入流。 –

+0

我已經使用SQL Server更新了這個問題並且沒有事務。如果我傳遞錯誤的修訂,現在第二個附加失敗。這是處理這種情況的正確方法嗎?在這種情況下,我應該將修訂保存在聚合狀態中,並在保存新事件時將其傳回。這是預期的實施? –

+0

樂觀鎖定由每個持久性引擎處理。在SQL中,它基於StreamId和CommitSequence的主鍵。因此,如果您同時打開同一個流兩次,則向兩者添加一個提交,這將導致CommitSequence衝突和一個ConcurrencyException。 – 2013-08-27 11:28:19