2015-09-01 36 views
1

我正在使用BlockingCollection創建一個隊列,其中多個線程可以將項目添加到單獨的線程處理的項目中。添加的項目還包含一個回調函數(委託),該項目在處理項目後調用。要添加項目,我使用TryAdd方法。等待添加到BlockingCollection

這一切都工作正常,但知道我想知道是否有任何方式來運行TryAdd後處理await。 我想要的是,添加的線程等待處理完成,然後繼續。

我希望我能很好地描述我的問題,這樣任何人都可以給我一個提示。

我的目標框架是Mono/.Net4.5。

+6

只是出於好奇:你爲什麼要等待?如果你需要同步處理一個項目,爲什麼你使用'BlockingCollection'?您可以在呼叫站點處理該項目,而不是將其添加到集合中。也許我在這裏錯過了一些東西。 – xxbbcc

+0

我懷疑這是[將基於回調的異步方法轉換爲等待任務]的重複(http://stackoverflow.com/questions/11879967/best-way-to-convert-callback-based-async-method-to-awaitable -任務)。 –

+0

@AlexeiLevenkov:這個'TryAdd'回調是基於什麼? – olydis

回答

1

唯一的解決方案是copeperate - 處理方必須告知您該項目已處理。的實現是非常簡單 - 一個方法是這樣的:

public struct SignalizableItem<T> 
{ 
    private readonly T _value; 
    private readonly TaskCompletionSource<object> _signaller; 

    public SignalizableItem(T value, TaskCompletionSource<object> signaller) 
    { 
    _value = value; 
    _signaller = signaller; 
    } 

    public void Process(Action<T> action) 
    { 
    try 
    { 
     action(_value); 
     _signaller.SetResult(default(object)); 
    } 
    catch (Exception ex) 
    { 
     _signaller.SetException(ex); 
    } 
    } 
} 

public static class BlockingCollectionExtensions 
{ 
    public static Task QueueAndWaitAsync<T> 
    (this BlockingCollection<SignalizableItem<T>> @this, T value) 
    { 
    var tcs = new TaskCompletionSource<object>(); 
    @this.Add(new SignalizableItem<T>(value, tcs)); 
    return tcs.Task; 
    } 
} 

用法很簡單 - 在製片方,你根本就

await collection.QueueAndWaitAsync(value); 

在消費者方面,你會解開價值和信號準備時:

var item = collection.Take(); 

item.Process 
(
    data => 
    { 
    // Your processing 
    ... 
    } 
); 

,當然還有,收集將BlockingCollection<SignalizableItem<YourType>>,而不是BlockingCollection<YourType>

你可以進一步通過添加另一個擴展方法簡化處理:

public static void Process<T> 
    (this BlockingCollection<SignalizableItem<T>> @this, Action<T> action) 
{ 
    @this.Take().Process(action); 
} 

它也可能是實現消除(簡單CancellationToken應該能正常運行)或關機的另一種形式是個好主意。

東西實際可用可與

public static void ProcessAll<T> 
    (this BlockingCollection<SignalizableItem<T>> @this, Action<T> action, 
    CancellationToken cancellationToken) 
{ 
    SignalizableItem<T> val; 
    while (@this.TryTake(out val, -1, cancellationToken)) val.Process(action); 
} 

抽象掉整個處理機制,並暴露只是簡單的動作代表結束。

+0

我在哪個命名空間中找到'Unit'類型? – Sebastian

+1

@塞巴斯蒂安哦,我的壞 - 我通常在不想通過某種類型時使用'Unit',但我試圖將它從樣本中移除;只需用'object'替換它即可。 – Luaan