2013-02-15 98 views
10

我有一個異步謂詞方法是這樣的:如何使用「Where」與異步謂詞?

​​

說我有Uri個集合:

var addresses = new[] 
{ 
    new Uri("http://www.google.com/"), 
    new Uri("http://www.stackoverflow.com/") //etc. 
}; 

我想用MeetsCriteria過濾addresses。我想要異步執行此操作;我想要對謂詞進行多次調用以異步運行,然後我想等待所有這些調用完成並生成過濾結果集。不幸的是,LINQ似乎並不支持異步謂詞,所以像這樣工作:

var filteredAddresses = addresses.Where(MeetsCriteria); 

是否有同樣方便的方式來做到這一點?

+2

如果支持這項功能,您會發生什麼?特別是當迭代'filteredAddresses'時,實際調用'MeetsCriteria'。 – 2013-02-15 07:50:04

+0

@DanielHilgarth:謝謝;那是個很好的觀點。這似乎並不適合LINQ。 – Sam 2013-02-17 22:27:53

回答

6

我認爲原因沒有這樣一個框架中的是,有很多可能的變化,每個選擇會在某些情況下是正確的:

  • 應該謂詞並行執行,或串聯?
    • 如果它們並行執行,它們是否應該全部執行,還是應該限制並行度?
    • 如果它們並行執行,結果應該與原始集合的順序相同,按完成順序還是未定義順序?
      • 如果他們應該按照完成順序返回,是否應該有某種方式(異步)在完成時獲取結果? (這將需要返回類型從Task<IEnumerable<T>>到別的變化。)

你說你想要的謂詞並行執行。在這種情況下,最簡單的選擇是在一次執行所有這些,在完成的順序返回:

static async Task<IEnumerable<T>> Where<T>(
    this IEnumerable<T> source, Func<T, Task<bool>> predicate) 
{ 
    var results = new ConcurrentQueue<T>(); 
    var tasks = source.Select(
     async x => 
     { 
      if (await predicate(x)) 
       results.Enqueue(x); 
     }); 
    await Task.WhenAll(tasks); 
    return results; 
} 

然後,您可以使用這樣的:

var filteredAddresses = await addresses.Where(MeetsCriteria); 
+1

我會使用不同的方法名稱,所以不同的語義(特別是重新排序)變得清晰。 – CodesInChaos 2013-02-15 13:02:49

+0

@CodesInChaos可能,但是我不確定什麼是好名字。 'AsyncParallelWhereOrderedByCompletion()'會描述這個方法的作用,但這是一個糟糕的名字。 – svick 2013-02-15 13:08:30

+0

也許像'ConcurrentlyFilterAsync'這樣的名字是合適的。 – Sam 2013-02-17 22:30:11

5

第一種方法:問題的所有先後請求預先請求,然後等待所有請求返回,然後過濾結果。 (svick的代碼也是這樣做的,但在這裏我沒有使用ConcurrentQueue)。

// First approach: massive fan-out 
var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) }); 
var addressesAndCriteria = await Task.WhenAll(tasks); 
var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A); 

第二種方法:一個接一個地執行請求。這將需要更長的時間,但它會確保不與請求的巨大沖擊錘的web服務(假設MeetsCriteriaAsync出去一個web服務...)

// Second approach: one by one 
var filteredAddresses = new List<Uri>(); 
foreach (var a in filteredAddresses) 
{ 
    if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a); 
} 

第三種方法:對於第二,但使用一個假想的C#8特性「異步流」。 C#8還沒有出來,異步流還沒有設計,但我們可以做夢! IAsyncEnumerable類型已經存在於RX中,並且希望它們會爲它添加更多的組合器。關於IAsyncEnumerable的好處在於,我們可以在開始使用前幾個filteredAddresses時立即開始消費,而不是等待所有要先過濾的東西。

// Third approach: ??? 
IEnumerable<Uri> addresses = {...}; 
IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync); 

第四種方法:也許我們不想一下子錘所有請求的web服務,但我們很樂意向在同一時間超過一個請求。也許我們做了實驗,發現「一次三個」是一個快樂的媒介。注意:此代碼假設單線程執行上下文,如在UI編程或ASP.NET中。如果它在多線程執行上下文中運行,那麼它需要一個ConcurrentQueue和ConcurrentList。

// Fourth approach: throttle to three-at-a-time requests 
var addresses = new Queue<Uri>(...); 
var filteredAddresses = new List<Uri>(); 
var worker1 = FilterAsync(addresses, filteredAddresses); 
var worker2 = FilterAsync(addresses, filteredAddresses); 
var worker3 = FilterAsync(addresses, filteredAddresses); 
await Task.WhenAll(worker1, worker2, worker3); 

async Task FilterAsync(Queue<Uri> q, List<Uri> r) 
{ 
    while (q.Count > 0) 
    { 
    var item = q.Dequeue(); 
    if (await MeetsCriteriaAsync(item)) r.Add(item); 
    } 
} 

對於使用TPL數據流庫的第四種方法也有一些辦法。