時,任務並行庫我有,我想通過狀態下的擴展方法:傳「狀態」包裹APM風格
// Overload 2
public static Task<TableQuerySegment<T>> ExecuteQuerySegmentedAsync<T> (this CloudTable tbl, TableQuery<T> query, TableContinuationToken continuationToken, CancellationToken token) where T : ITableEntity, new()
{
return tbl.ExecuteQuerySegmentedAsync<T>(query, continuationToken, null, null, token);
}
// Overload 5
public static Task<TableQuerySegment<TElement>> ExecuteQuerySegmentedAsync<TElement>(this CloudTable tbl, TableQuery<TElement> query, TableContinuationToken continuationToken, TableRequestOptions opt, OperationContext ctx, CancellationToken token) where TElement : ITableEntity, new()
{
ICancellableAsyncResult result = null;
if (opt == null && ctx == null)
result = tbl.BeginExecuteQuerySegmented<TElement>(query, continuationToken, null, tbl);
else
result = tbl.BeginExecuteQuerySegmented<TElement>(query, continuationToken, opt, ctx, null, tbl);
var cancellationRegistration = token.Register(result.Cancel);
return Task.Factory.FromAsync(result, iAsyncResult =>
{
CloudTable currentTable = iAsyncResult.AsyncState as CloudTable;
cancellationRegistration.Dispose(); // todo: handle cleanup of this (deregistration)
return currentTable.EndExecuteQuerySegmented<TElement>(result);
});
}
// Overload 3
public static Task<TableQuerySegment<R>> ExecuteQuerySegmentedAsync<T, R>(this CloudTable tbl, TableQuery<T> query, EntityResolver<R> resolver, TableContinuationToken continuationToken, CancellationToken token) where T : ITableEntity, new()
{
return tbl.ExecuteQuerySegmentedAsync<T, R>(query, resolver, continuationToken, null, null, token);
}
// Overload 6
public static Task<TableQuerySegment<R>> ExecuteQuerySegmentedAsync<TElement, R>(this CloudTable tbl, TableQuery<TElement> query, EntityResolver<R> resolver, TableContinuationToken continuationToken, TableRequestOptions opt, OperationContext ctx, CancellationToken token) where TElement : ITableEntity, new()
{
ICancellableAsyncResult result = null;
if (opt == null && ctx == null)
result = tbl.BeginExecuteQuerySegmented<TElement, R>(query, resolver, continuationToken, null, null);
else
result = tbl.BeginExecuteQuerySegmented<TElement, R>(query, resolver, continuationToken, opt, ctx, null, null);
var cancellationRegistration = token.Register(result.Cancel);
return Task.Factory.FromAsync(result, iAsyncResult =>
{
CloudTable currentTable = iAsyncResult.AsyncState as CloudTable;
cancellationRegistration.Dispose(); // todo: handle cleanup of this (deregistration)
return currentTable.EndExecuteQuerySegmented<R>(result);
});
}
這樣我就可以這樣調用
// prepare the query
trustsInBatchesOf100 = tableSymmetricKeys.ExecuteQuerySegmentedAsync(query, token, opt, ctx, cancelToken);
// my state
object mystate = trustsInBatchesOf100.AsyncState;
代碼
我被困在試圖弄清楚如何修改擴展方法以包含stateObject
。難點是我在TaskFactory.FromAsync中找到了正確的過載,並在擴展方法中正確實現了它。
問題
如何修改上面的擴展方法正確接受的狀態參數?
更多的源代碼
private CloudTableClient tableClient;
private CloudStorageAccount account;
private TableRequestOptions opt;
List<Task<TableResult>> AllRunningTasks = null;
public async Task GetTrustsAndValues(string thingToSearchFor,string trustStartingPoint,
int depth,
TableQuery query,
OperationContext ctx, CancellationToken cancelToken)
{
TrustState asyncState = new TrustState() { ThingToSearchFor = thingToSearchFor, TimeStarted = DateTime.UtcNow };
var ret = new TrustTree<string>(thingToSearchFor, "start", 10);
CloudTableClient client = account.CreateCloudTableClient();
CloudTable tableSymmetricKeys = client.GetTableReference("SymmetricKeys5");
// List<Task> taskList = new List<Task>();
TableContinuationToken token = new TableContinuationToken() { };
Task<TableQuerySegment<DynamicTableEntity>> trustsInBatchesOf100 = null;
while (true)
{
// prepare the query
trustsInBatchesOf100 = tableSymmetricKeys.ExecuteQuerySegmentedAsync(query, token, opt, ctx, cancelToken);
object mystate = trustsInBatchesOf100.AsyncState;
// Run the method
trustsInBatchesOf100.Wait();
// Create tasks for each of the 100 domains found.
IEnumerable<Task<TableResult>> getTrustDataQuery =
from domainData in trustsInBatchesOf100.Result select QueryPartnerForData(ref ret,domainData, thingToSearchFor, client, cancelToken);
// Save the count of lookups as appropriate
asyncState.RunningDirectQueries = getTrustDataQuery.Count();
// Run up to 100 lookups on those domains.
var runningTasks = getTrustDataQuery.ToList();
// Add running tasks to the full compilation of running tasks
AllRunningTasks.AddRange(runningTasks);
// Prepare for next iteration or quit
if (token == null)
{
break;
}
else
{
token = trustsInBatchesOf100.Result.ContinuationToken;
// todo: persist token token.WriteXml()
}
}
}
private static Task<TableResult> QueryPartnerForData(ref TrustTree<string> tree, DynamicTableEntity domainData, string thingToSearchFor, CloudTableClient client, CancellationToken cancelToken)
{
// Create the table client.
CloudTable tableDirectQuery = client.GetTableReference(String.Format("{0}_FW", domainData.RowKey));
// Retrieve the entity with partition key of "Smith" and row key of "Jeff"
TableOperation tableOperation = TableOperation.Retrieve(domainData.RowKey, thingToSearchFor);
// Retrieve entity asynchronously
return tableDirectQuery.ExecuteAsync(tableOperation, cancelToken);
}
public async void SaveCompletedQueriesTo()
{
while (AllRunningTasks.Count > 0)
{
// Identify the first task that completes.
Task<TableResult> firstFinishedTask = await Task.WhenAny(AllRunningTasks);
// ***Remove the selected task from the list so that you don't
// process it more than once.
AllRunningTasks.Remove(firstFinishedTask);
// Await the completed task.
var taskOfTableResult = await firstFinishedTask;
//todo: asyncState: need to know what the
// - target to save to is.
}
// TODO: if all tasks have completed then update
}
'Task.AsyncState'通常不用。你能解釋爲什麼你需要它嗎?最有可能有更好的方法來做你想做的事情,而不涉及使用'AsyncState'(或者'AsyncState',這是必要的)。 – svick
@svick我正在使用分支了「樹」的任務進行遞歸。我正在考慮使用AsyncState來跟蹤進程的位置(樹的級別和分支)。正如你所看到的,我正在使用Azure Tables。我的目標是每輪「continuationTokens」都有相同的變量,但每次遞歸的狀態略有變化。如果您感興趣,我會發布更多源代碼... – LamonteCristo
我在您的代碼中看不到遞歸,或者需要'AsyncState'。也許你可以簡化你的代碼,使它只包含與你的問題相關的部分?另外,要小心使用'Count()'方法來處理'getTrustDataQuery',因爲這意味着整個集合將迭代兩次。 – svick