2015-08-31 14 views
0

我有一個具有blob和隊列觸發器的Azure WebKob以將數據保存到Azure DocumentDb。如何正確地限制WebJobs對DocumentDb的訪問

不時我得到一個錯誤:

Microsoft.Azure.Documents.RequestRateTooLargeException: Message: {"Errors":["Request rate is large"]}

目前我掐死使用此代碼的請求。一個WebJob功能:

public async Task ParseCategoriesFromCsv(...) 
{ 
    double find = 2.23, add = 5.9, replace = 10.67; 
    double requestCharge = Math.Round(find + Math.Max(add, replace)); 

    await categoryProvider.SaveCategories(requestCharge , categories); 
} 

類別供應商操作文檔數據庫客戶端:

public async Task<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories) 
{ 
    var requestDelay = TimeSpan.FromSeconds(60.0/(collectionOptions.RequestUnits/requestCharge)); 

    var scheduler = new IntervalTaskScheduler(requestDelay, Scheduler.Default); // Rx 

    var client = new DocumentClient(endpoint, authorizationKey, 
     new ConnectionPolicy 
     { 
      ConnectionMode = documentDbOptions.ConnectionMode, 
      ConnectionProtocol = documentDbOptions.ConnectionProtocol 
     }); 

    return await Task.WhenAll(documents.Select(async d => 
     await scheduler.ScheduleTask(
      () => client.PutDocumentToDb(collectionOptions.CollectionLink, d.SearchIndex, d)))); 
} 

任務調度程序油門/措施/同步請求:

private readonly Subject<Action> _requests = new Subject<Action>(); 
private readonly IDisposable _observable; 

public IntervalTaskScheduler(TimeSpan requestDelay, IScheduler scheduler) 
{ 
    _observable = _requests.Select(i => Observable.Empty<Action>() 
                .Delay(requestDelay) 
                .StartWith(i)) 
          .Concat() 
          .ObserveOn(scheduler) 
          .Subscribe(action => action()); 
} 

public Task<T> ScheduleTask<T>(Func<Task<T>> request) 
{ 
    var tcs = new TaskCompletionSource<T>(); 
    _requests.OnNext(async() => 
    { 
     try 
     { 
      T result = await request(); 
      tcs.SetResult(result); 
     } 
     catch (Exception ex) 
     { 
      tcs.SetException(ex); 
     } 
    }); 
    return tcs.Task; 
} 

所以它基本上是一個數常量來自ResourceResponse<Document>.RequestCharge但是:

  • 當我有1隊列觸發它可以正常工作,但當8隊列它會引發錯誤。
  • 如果增加請求費用8次然後8個隊列工作正常,但只有1個工程慢8倍。

什麼節流/測量/同步機制可以在這裏很好地工作?

+0

您的代碼非常複雜。你已經有了課程的片段,你將Rx與TPL混合在一起。我認爲有一點簡單的Rx代碼可以在幾行代碼中完成你所需要的。您能否讓我們知道您正在執行的核心操作以及班級簽名是什麼? – Enigmativity

+0

@Enigmativity:我害怕過分複雜化我的問題。所以實際上它至少有4個不同的類,非常薄,並且由一個容器連接的接口覆蓋。 – abatishchev

+0

你絕對是讓它變得複雜。採用正確的結構將有助於我們。 – Enigmativity

回答

2

啓動.NET SDK 1.8.0後,我們會自動處理請求率過大的異常到合理的程度(默認情況下會重試9次,並在從服務器返回後重試下一次重試)。

如果您需要更好的控制,您可以在傳遞給DocumentClient對象的ConnectionPolicy實例上配置RetryOptions,並使用它覆蓋默認重試策略。

因此,您不再需要添加任何自定義邏輯來處理上述應用程序代碼中的429個異常。

1

當得到429(請求率太大)時,響應會告訴您需要等待多長時間。有一個標題x-ms-retry-after。這有一個價值。以毫秒爲單位等待這段時間。

catch (AggregateException ex) when (ex.InnerException is DocumentClientException) 
{ 
    DocumentClientException dce = (DocumentClientException)ex.InnerException; 
    switch ((int)dce.StatusCode) 
    { 
     case 429: 
      Thread.Sleep(dce.RetryAfter); 
      break; 

     default: 
      Console.WriteLine(" Failed: {0}", ex.InnerException.Message); 
      throw; 
    }      
} 
+0

其實這是很酷的解決方法。但我想嘗試寫一個更精確的調度程序,而不是將所有內容放在一起,然後處理db踢回我的請求,並無限次地重複它們,直到它們全部通過:) – abatishchev

+0

與其一次性全部解除它們,你能用一個線程從這些隊列中拉出來,那麼你可以使用429機制?這就是我處理它的方式。 –

+0

@Larry:你可以分享一些代碼或配置嗎? – abatishchev

1

在我看來,你應該能夠做到這與你的SaveCategories方法,使其與的Rx的工作很好:

public IObservable<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories) 
{ 
    var requestDelay = TimeSpan.FromSeconds(60.0/(collectionOptions.RequestUnits/requestCharge)); 

    var client = new DocumentClient(endpoint, authorizationKey, 
     new ConnectionPolicy 
     { 
      ConnectionMode = documentDbOptions.ConnectionMode, 
      ConnectionProtocol = documentDbOptions.ConnectionProtocol 
     }); 

    return 
     Observable.Interval(requestDelay) 
      .Zip(documents, (delay, doc) => doc) 
      .SelectMany(doc => Observable.FromAsync(() => client.PutDocumentToDb(collectionOptions.CollectionLink, doc.SearchIndex, doc))) 
      .ToArray(); 
} 

這完全擺脫你的IntervalTaskScheduler類,並確保您將請求速率限制爲每個requestDelay時間跨度的一個請求,但允許響應按需要進行。通過調用.ToArray()來調用IObservable<ResourceResponse<Document>>,該函數將許多值返回到IObservable<ResourceResponse<Document>[]>,該值在observable完成時返回單個值的數組。

我無法測試你的代碼,所以我測試的樣本,我想模仿你的代碼:

var r = new Random(); 
var a = Enumerable.Range(0, 1000); 
var i = Observable.Interval(TimeSpan.FromSeconds(2.0)); 

var sw = Stopwatch.StartNew(); 

var query = 
    i.Zip(a, (ii, aa) => aa) 
     .SelectMany(aa => Observable.Start(() => 
     { 
      var x = sw.Elapsed.TotalMilliseconds; 
      Thread.Sleep(r.Next(0, 5000)); 
      return x; 
     })) 
     .Select(x => new 
     { 
      started = x, 
      ended = sw.Elapsed.TotalMilliseconds 
     }); 

我得到這種結果這表明要求被節流:

4026.2983 5259.7043 
2030.1287 6940.2326 
6027.0439 9664.1045 
8027.9993 10207.0579 
10028.1762 12301.4746 
12028.3190 12711.4440 
14040.7972 17433.1964 
16040.9267 17574.5924 
18041.0529 19077.5545 
+0

根據我的理解'Observable.Timer()'在給定延遲後產生一個單值。 – abatishchev

+0

對不起,你是對的。這應該是'時間間隔'。我會盡快解決。 – Enigmativity

+0

你如何開始'IObservable '?對不起,愚蠢的問題,但我對Rx很新。 – abatishchev