2017-01-11 46 views
4

將代碼放在我的數據訪問類中。使用反應式擴展重試異步任務代碼

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null, 
      CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null) 
     { 
      using (var connection = Connection) 
      { 
       var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault); 
       Task<IEnumerable<TEntity>> queryTask = 
        connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction, 
         commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token)); 
       IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false); 
       connection.Close(); 
       connection.Dispose(); 
       tokenSource.Dispose(); 
       return data; 
      } 
     } 

我想要一個SqlExeption拋出重試一次。請記住,我不能將RX應用於應用程序,但僅限於此代碼塊。

我嘗試下面的代碼,它看起來是正確執行和Do被記錄在控制檯輸出,但並沒有真正調用Catch處理程序,如果Retry處理程序執行,以及我不知道。

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null, 
      CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null) 
     { 
      return await Observable.Defer(async() => 
      { 
       using (var connection = Connection) 
       { 
        var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault); 
        Task<IEnumerable<TEntity>> queryTask = 
         connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction, 
          commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token)); 
        IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false); 
        connection.Close(); 
        connection.Dispose(); 
        tokenSource.Dispose(); 
        return Observable.Return(data); 
       } 
      }) 
      .Catch<IEnumerable<TEntity>, SqlException>(source => 
      { 
       Debug.WriteLine($"QueryAsync Exception {source}"); 
       return Observable.Return(new List<TEntity>()); 
      }) 
      .Throttle(TimeSpan.FromMilliseconds(500)) 
      .Retry(1) 
      .Do(_ => Debug.WriteLine("Do QueryAsync")); 
     } 

回答

6

我可以看到你的代碼的幾個潛在的問題:

  • 獨立於主邏輯重試邏輯,在一個名爲QueryWithRetryAsync例如方法。這只是一個設計問題,但問題仍然存在
  • 不要Catch直到Retry。否則SqlException將導致空列表和Retry運營商將永遠不會看到異常
  • 我不認爲Throttle是不必要的,因爲你永遠只想到通過管道
  • Retry(1)沒有做一個價值你認爲它的確如此(這對我來說也是一個驚喜)。看來的「重試」的定義包括第一次調用,所以你需要Retry(2)

這裏是表現你所希望的方式獨立例如:改變重試次數爲2的工作就像一個

class Program 
{ 
    static void Main(string[] args) 
    { 
     var pipeline = Observable 
      .Defer(() => DoSomethingAsync().ToObservable()) 
      .Retry(2) 
      .Catch<string, InvalidOperationException>(ex => Observable.Return("default")); 

     pipeline 
      .Do(Console.WriteLine) 
      .Subscribe(); 

     Console.ReadKey(); 
    } 

    private static int invocationCount = 0; 

    private static async Task<string> DoSomethingAsync() 
    { 
     Console.WriteLine("Attempting DoSomethingAsync"); 

     await Task.Delay(TimeSpan.FromSeconds(2)); 

     ++invocationCount; 

     if (invocationCount == 2) 
     { 
      return "foo"; 
     } 

     throw new InvalidOperationException(); 
    } 
} 
+0

魅力! –