2016-11-02 58 views
0

我正在擺弄Azure函數,將它與CQRS和事件源相結合。我正在使用Azure表存儲作爲事件存儲。下面的代碼是一個簡化版本,不會分散問題。如何使用IQueryable和IAsyncCollector鎖定Azure函數中的Azure表分區?

我對任何代碼提示都不感興趣,因爲這不是代碼的最終版本。

public static async Task Run(BrokeredMessage commandBrokeredMessage, IQueryable<DomainEvent> eventsQueryable, IAsyncCollector<IDomainEvent> eventsCollector, TraceWriter log) 
{ 
    var command = commandBrokeredMessage.GetBody<FooCommand>(); 
    var committedEvents = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId); 
    var expectedVersion = committedEvents .Max(e => e.Version); 

    // some domain logic that will result in domain events 
    var uncommittedEvents = HandleFooCommand(command, committedEvents); 

    // using(Some way to lock partition) 
    // { 
    var currentVersion = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId).Max(e => e.Version); 

    if(expectedVersion != currentVersion) 
    { 
     throw new ConcurrencyException("expected version is not the same as current version"); 
    } 

    var i = currentVersion; 
    foreach (var domainEvent in uncommittedEvents.OrderBy(e => e.Timestamp)) 
    { 
     i++; 
     domainEvent.Version = i; 
     await eventsCollector.AddAsync(domainEvent); 
    } 
    // } 
} 

public class DomainEvent : TableEntity 
{ 
    private string eventType; 

    public virtual string EventType 
    { 
     get { return eventType ?? (eventType = GetType().UnderlyingSystemType.Name); } 
     set { eventType = value; } 
    } 

    public long Version { get; set; } 
} 

我的努力

爲了公平起見,我不能嘗試任何事情,因爲我不知道從哪裏開始,如果這甚至有可能。 Id做了一些研究,但沒有解決我的問題,但可以幫助你解決這個問題。

Azure Tables是否支持鎖定? 是的,他們這樣做:Managing Concurrency in Microsoft Azure Storage。這被稱爲租賃,但我不知道如何在Azure功能中實現此功能。

其他來源

提示,建議,方案

我總是願意就如何解決問題的任何建議,但我不能接受這些作爲我的問題的答案。除非我的問題的答案是「否」,否則我不能將其他選擇作爲答案。我不是在尋求解決我的問題的最佳方法,我希望它按我設計的方式工作。我知道這是固執的,但這是練習/擺弄。

回答

1

Blob租約確實可以很好地完成您想要完成的任務(Functions運行時實際上在內部廣泛使用)。

如果在使用分區之前,您在blob上獲得租約(按照慣例,以分區命名的blob或類似的東西),您將能夠確保只有給定的函數正在處理劃分。

您鏈接到的文章確實顯示了租賃獲取和發佈的示例,您可以在documentation中找到更多信息。要確保

的一件事是你刷新你收集你離開鎖定範圍以前(通過調用FlushAsync它)

我希望這有助於!

+0

能否詳細說明一下?我無法在ICollector上找到Flush方法。您是否假設我正在使用Azure Blob存儲而不是Azure表存儲,或者您是否建議我應該將這兩者結合起來。在使用表存儲用於存儲我的數據和blob存儲鎖定?如果是這樣,你可以添加一個代碼示例如何在Azure函數中工作? – annemartijn

+1

ICollector有一個Flush,你在上面使用的是(IAsyncCollector)有一個FlushAsync(我更新了匹配的答案)。是的,你知道了。我正在描述一個流程,在您對錶格執行操作之前,您將獲得一個blob租約充當鎖定。 –

+0

我不得不將Microsoft.Azure.WebJobs更新到2.0.0-beta1以獲取'FlushAsync()'方法。我已經創建了一個要點,你能告訴我這是否可行嗎? https://gist.github.com/annemartijn0/03bf4020aabe24abeeb97e2edba665c1 – annemartijn