唯一的解決方案是copeperate - 處理方必須告知您該項目已處理。的實現是非常簡單 - 一個方法是這樣的:
public struct SignalizableItem<T>
{
private readonly T _value;
private readonly TaskCompletionSource<object> _signaller;
public SignalizableItem(T value, TaskCompletionSource<object> signaller)
{
_value = value;
_signaller = signaller;
}
public void Process(Action<T> action)
{
try
{
action(_value);
_signaller.SetResult(default(object));
}
catch (Exception ex)
{
_signaller.SetException(ex);
}
}
}
public static class BlockingCollectionExtensions
{
public static Task QueueAndWaitAsync<T>
(this BlockingCollection<SignalizableItem<T>> @this, T value)
{
var tcs = new TaskCompletionSource<object>();
@this.Add(new SignalizableItem<T>(value, tcs));
return tcs.Task;
}
}
用法很簡單 - 在製片方,你根本就
await collection.QueueAndWaitAsync(value);
在消費者方面,你會解開價值和信號準備時:
var item = collection.Take();
item.Process
(
data =>
{
// Your processing
...
}
);
,當然還有,收集將BlockingCollection<SignalizableItem<YourType>>
,而不是BlockingCollection<YourType>
。
你可以進一步通過添加另一個擴展方法簡化處理:
public static void Process<T>
(this BlockingCollection<SignalizableItem<T>> @this, Action<T> action)
{
@this.Take().Process(action);
}
它也可能是實現消除(簡單CancellationToken
應該能正常運行)或關機的另一種形式是個好主意。
東西實際可用可與
public static void ProcessAll<T>
(this BlockingCollection<SignalizableItem<T>> @this, Action<T> action,
CancellationToken cancellationToken)
{
SignalizableItem<T> val;
while (@this.TryTake(out val, -1, cancellationToken)) val.Process(action);
}
抽象掉整個處理機制,並暴露只是簡單的動作代表結束。
只是出於好奇:你爲什麼要等待?如果你需要同步處理一個項目,爲什麼你使用'BlockingCollection'?您可以在呼叫站點處理該項目,而不是將其添加到集合中。也許我在這裏錯過了一些東西。 – xxbbcc
我懷疑這是[將基於回調的異步方法轉換爲等待任務]的重複(http://stackoverflow.com/questions/11879967/best-way-to-convert-callback-based-async-method-to-awaitable -任務)。 –
@AlexeiLevenkov:這個'TryAdd'回調是基於什麼? – olydis