1

時,任務並行庫我有,我想通過狀態下的擴展方法:傳「狀態」包裹APM風格

 // Overload 2 
    public static Task<TableQuerySegment<T>> ExecuteQuerySegmentedAsync<T> (this CloudTable tbl, TableQuery<T> query, TableContinuationToken continuationToken, CancellationToken token) where T : ITableEntity, new() 
    { 
     return tbl.ExecuteQuerySegmentedAsync<T>(query, continuationToken, null, null, token); 
    } 

    // Overload 5 
    public static Task<TableQuerySegment<TElement>> ExecuteQuerySegmentedAsync<TElement>(this CloudTable tbl, TableQuery<TElement> query, TableContinuationToken continuationToken, TableRequestOptions opt, OperationContext ctx, CancellationToken token) where TElement : ITableEntity, new() 
    { 
     ICancellableAsyncResult result = null; 

     if (opt == null && ctx == null) 
      result = tbl.BeginExecuteQuerySegmented<TElement>(query, continuationToken, null, tbl); 
     else 
      result = tbl.BeginExecuteQuerySegmented<TElement>(query, continuationToken, opt, ctx, null, tbl); 

     var cancellationRegistration = token.Register(result.Cancel); 

     return Task.Factory.FromAsync(result, iAsyncResult => 
     { 
      CloudTable currentTable = iAsyncResult.AsyncState as CloudTable; 

      cancellationRegistration.Dispose(); // todo: handle cleanup of this (deregistration) 
      return currentTable.EndExecuteQuerySegmented<TElement>(result); 
     }); 
    } 

    // Overload 3 
    public static Task<TableQuerySegment<R>> ExecuteQuerySegmentedAsync<T, R>(this CloudTable tbl, TableQuery<T> query, EntityResolver<R> resolver, TableContinuationToken continuationToken, CancellationToken token) where T : ITableEntity, new() 
    { 
     return tbl.ExecuteQuerySegmentedAsync<T, R>(query, resolver, continuationToken, null, null, token); 
    } 

    // Overload 6 
    public static Task<TableQuerySegment<R>> ExecuteQuerySegmentedAsync<TElement, R>(this CloudTable tbl, TableQuery<TElement> query, EntityResolver<R> resolver, TableContinuationToken continuationToken, TableRequestOptions opt, OperationContext ctx, CancellationToken token) where TElement : ITableEntity, new() 
    { 
     ICancellableAsyncResult result = null; 

     if (opt == null && ctx == null) 
      result = tbl.BeginExecuteQuerySegmented<TElement, R>(query, resolver, continuationToken, null, null); 
     else 
      result = tbl.BeginExecuteQuerySegmented<TElement, R>(query, resolver, continuationToken, opt, ctx, null, null); 

     var cancellationRegistration = token.Register(result.Cancel); 

     return Task.Factory.FromAsync(result, iAsyncResult => 
     { 
      CloudTable currentTable = iAsyncResult.AsyncState as CloudTable; 

      cancellationRegistration.Dispose(); // todo: handle cleanup of this (deregistration) 
      return currentTable.EndExecuteQuerySegmented<R>(result); 
     }); 
    } 

這樣我就可以這樣調用

  // prepare the query 
      trustsInBatchesOf100 = tableSymmetricKeys.ExecuteQuerySegmentedAsync(query, token, opt, ctx, cancelToken); 

      // my state 
      object mystate = trustsInBatchesOf100.AsyncState; 
代碼

我被困在試圖弄清楚如何修改擴展方法以包含stateObject。難點是我在TaskFactory.FromAsync中找到了正確的過載,並在擴展方法中正確實現了它。

問題

如何修改上面的擴展方法正確接受的狀態參數?

更多的源代碼

private CloudTableClient tableClient; 
    private CloudStorageAccount account; 
    private TableRequestOptions opt; 

    List<Task<TableResult>> AllRunningTasks = null; 

    public async Task GetTrustsAndValues(string thingToSearchFor,string trustStartingPoint, 
     int depth, 
     TableQuery query,        
     OperationContext ctx, CancellationToken cancelToken) 
    { 
     TrustState asyncState = new TrustState() { ThingToSearchFor = thingToSearchFor, TimeStarted = DateTime.UtcNow }; 

     var ret = new TrustTree<string>(thingToSearchFor, "start", 10); 


     CloudTableClient client = account.CreateCloudTableClient(); 
     CloudTable tableSymmetricKeys = client.GetTableReference("SymmetricKeys5"); 

     // List<Task> taskList = new List<Task>(); 

     TableContinuationToken token = new TableContinuationToken() { }; 
     Task<TableQuerySegment<DynamicTableEntity>> trustsInBatchesOf100 = null; 

     while (true) 
     { 
      // prepare the query 
      trustsInBatchesOf100 = tableSymmetricKeys.ExecuteQuerySegmentedAsync(query, token, opt, ctx, cancelToken); 

      object mystate = trustsInBatchesOf100.AsyncState; 

      // Run the method 
      trustsInBatchesOf100.Wait(); 

      // Create tasks for each of the 100 domains found. 
      IEnumerable<Task<TableResult>> getTrustDataQuery = 
          from domainData in trustsInBatchesOf100.Result select QueryPartnerForData(ref ret,domainData, thingToSearchFor, client, cancelToken); 

      // Save the count of lookups as appropriate 
      asyncState.RunningDirectQueries = getTrustDataQuery.Count(); 

      // Run up to 100 lookups on those domains. 
      var runningTasks = getTrustDataQuery.ToList(); 

      // Add running tasks to the full compilation of running tasks 
      AllRunningTasks.AddRange(runningTasks); 

      // Prepare for next iteration or quit 
      if (token == null) 
      { 
       break; 
      } 
      else 
      { 
       token = trustsInBatchesOf100.Result.ContinuationToken; 

       // todo: persist token token.WriteXml() 
      } 
     } 
    } 



    private static Task<TableResult> QueryPartnerForData(ref TrustTree<string> tree, DynamicTableEntity domainData, string thingToSearchFor, CloudTableClient client, CancellationToken cancelToken) 
    { 
     // Create the table client. 
     CloudTable tableDirectQuery = client.GetTableReference(String.Format("{0}_FW", domainData.RowKey)); 

     // Retrieve the entity with partition key of "Smith" and row key of "Jeff" 
     TableOperation tableOperation = TableOperation.Retrieve(domainData.RowKey, thingToSearchFor); 

     // Retrieve entity asynchronously 
     return tableDirectQuery.ExecuteAsync(tableOperation, cancelToken); 
    } 

    public async void SaveCompletedQueriesTo() 
    { 
     while (AllRunningTasks.Count > 0) 
     { 
      // Identify the first task that completes. 
      Task<TableResult> firstFinishedTask = await Task.WhenAny(AllRunningTasks); 

      // ***Remove the selected task from the list so that you don't 
      // process it more than once. 
      AllRunningTasks.Remove(firstFinishedTask); 

      // Await the completed task. 
      var taskOfTableResult = await firstFinishedTask; 

      //todo: asyncState: need to know what the 
      // - target to save to is. 
     } 

     // TODO: if all tasks have completed then update 
    } 
+0

'Task.AsyncState'通常不用。你能解釋爲什麼你需要它嗎?最有可能有更好的方法來做你想做的事情,而不涉及使用'AsyncState'(或者'AsyncState',這是必要的)。 – svick

+0

@svick我正在使用分支了「樹」的任務進行遞歸。我正在考慮使用AsyncState來跟蹤進程的位置(樹的級別和分支)。正如你所看到的,我正在使用Azure Tables。我的目標是每輪「continuationTokens」都有相同的變量,但每次遞歸的狀態略有變化。如果您感興趣,我會發布更多源代碼... – LamonteCristo

+0

我在您的代碼中看不到遞歸,或者需要'AsyncState'。也許你可以簡化你的代碼,使它只包含與你的問題相關的部分?另外,要小心使用'Count()'方法來處理'getTrustDataQuery',因爲這意味着整個集合將迭代兩次。 – svick

回答

-1

你見過Microsoft.WindowsAzure.StorageClient.Async項目? (也是available on NuGet)。它可能提供一些想法,或已經解決了你的問題。否則,請在考慮合適的時候考慮對該項目進行修改。

+0

不知道爲什麼有這個答案的投票,但我也有一個同樣的git項目... https://github.com/makerofthings7/AzureStorageExtensions – LamonteCristo