我是NEventStore和一般事件採購的新手。在一個項目中,我想使用NEventStore來持久化由我們的聚合生成的事件,但是我有一些問題需要正確處理併發。NEventStore樂觀鎖
如何使用樂觀鎖寫入相同的流?
比方說,我有2個實例從2個不同的線程在修訂1加載相同的聚合。然後是第一個線程調用命令A和第二個線程調用命令B.使用樂觀鎖定其中一個聚合應該失敗,併發異常。
我以爲使用maxRevision從加載聚集的角度打開流,但似乎CommitChanges永遠不會失敗,如果我通過舊版本。
我失蹤了什麼?使用NEventStore /事件採購時,樂觀鎖定可能/正確嗎?
這裏是我用來再現該問題的代碼:
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (var scope = new TransactionScope())
using (store = WireupEventStore())
{
Client1(revision: 0);
Client2(revision: 0);
scope.Complete();
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.UsingInMemoryPersistence()
.Build();
}
private static void Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
我希望客戶端2失敗,因爲我用舊版本打開流。
UPDATE 26/08/2013: 我已經測試了相同的代碼,使用Sql服務器,似乎按預期工作。
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (store = WireupEventStore())
{
OpenOrCreateStream();
AppendToStream_Client1(revision: 1);
AppendToStream_Client2(revision: 1); // throws an error
// AppendToStream_Client2(revision: 2); // works
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.LogToOutputWindow()
.UsingInMemoryPersistence()
.UsingSqlPersistence("EventStore") // Connection string is in app.config
.WithDialect(new MsSqlDialect())
.InitializeStorageEngine()
.UsingJsonSerialization()
.Build();
}
private static void OpenOrCreateStream()
{
using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
{
var @event = new SomeDomainEvent { Value = "Initial event." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 2." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
所以回到我的問題:要啓用樂觀鎖定,我應該使用修訂時打開流?還有其他可能的實現或準則?
感謝
謝謝達米安。但我不確定要理解。假設我刪除了TransactionScope。有可能處理樂觀鎖?怎麼樣?基本上我只想在這段時間內沒有其他事件被提交時寫入流。 –
我已經使用SQL Server更新了這個問題並且沒有事務。如果我傳遞錯誤的修訂,現在第二個附加失敗。這是處理這種情況的正確方法嗎?在這種情況下,我應該將修訂保存在聚合狀態中,並在保存新事件時將其傳回。這是預期的實施? –
樂觀鎖定由每個持久性引擎處理。在SQL中,它基於StreamId和CommitSequence的主鍵。因此,如果您同時打開同一個流兩次,則向兩者添加一個提交,這將導致CommitSequence衝突和一個ConcurrencyException。 – 2013-08-27 11:28:19