2013-12-20 77 views
1

我想實現使用任務並行庫以下行爲:獲取信息的批次可用

收到郵件時,我想順序,但在小組處理它們。所以當第一條消息到達時,應立即處理。如果2級的消息進來,而第一個被處理的話就應該一組2

在處理我幾乎能得到我想要使用什麼BatchBlock鏈接到ActionBlock

var batchBlock = new BatchBlock<int>(100); 

var actionBlock = new ActionBlock<int[]>(list => 
    { 
     // do work 

     // now trigger 
     batchBlock.TriggerBatch(); 
    }); 

batchBlock.LinkTo(actionBlock); 

的問題上面的代碼是,如果一個項目在TriggerBatch()呼叫之後到達,那麼它需要等待批次填滿。如果我在每個帖子後觸發批處理,則ActionBlock總是會收到單個消息。

+0

你能解釋爲什麼你需要這種行爲? – svick

+0

@svick我想批量更新數據庫,所以數據庫往返次數更少 - 我的更改以流的形式出現。 – Brownie

回答

0

而不是BatchBlock,您可以使用BufferBlockTask接收來自它的項目,並根據您的邏輯將它們批量重新發送到目標。因爲您需要嘗試發送包含批次的消息,並在其他項目進入時取消它,所以目標塊(樣本中的actionBlock)必須將BoundedCapacity設置爲1.

因此,您所做的是你首先得到一些東西。當你有這些時,你開始異步發送,你也嘗試接收更多的項目。如果先發送完成,則重新開始。如果先完成接收,則取消發送,將收到的項目添加到批處理中,然後再次啓動兩個異步操作。

實際代碼有點複雜,因爲它需要處理一些角落案例(接收和發送同時完成;發送無法取消;接收完成,因爲整個完成;異常) :

public static ITargetBlock<T> CreateBatchingWrapper<T>(
ITargetBlock<IReadOnlyList<T>> target) 
{ 
    // target should have BoundedCapacity == 1, 
    // but there is no way to check for that 

    var source = new BufferBlock<T>(); 

    Task.Run(() => BatchItems(source, target)); 

    return source; 
} 

private static async Task BatchItems<T>(
    IReceivableSourceBlock<T> source, ITargetBlock<IReadOnlyList<T>> target) 
{ 
    try 
    { 
     while (true) 
     { 
      var messages = new List<T>(); 

      // wait for first message in batch 
      if (!await source.OutputAvailableAsync()) 
      { 
       // source was completed, complete target and return 
       target.Complete(); 
       return; 
      } 

      // receive all there is right now 
      source.ReceiveAllInto(messages); 

      // try sending what we've got 
      var sendCancellation = new CancellationTokenSource(); 
      var sendTask = target.SendAsync(messages, sendCancellation.Token); 

      var outputAvailableTask = source.OutputAvailableAsync(); 

      while (true) 
      { 
       await Task.WhenAny(sendTask, outputAvailableTask); 

       // got another message, try cancelling send 
       if (outputAvailableTask.IsCompleted 
        && outputAvailableTask.Result) 
       { 
        sendCancellation.Cancel(); 

        // cancellation wasn't successful 
        // and the message was received, start another batch 
        if (!await sendTask.EnsureCancelled() && sendTask.Result) 
         break; 

        // send was cancelled, receive messages 
        source.ReceiveAllInto(messages); 

        // restart both Tasks 
        sendCancellation = new CancellationTokenSource(); 
        sendTask = target.SendAsync(
         messages, sendCancellation.Token); 
        outputAvailableTask = source.OutputAvailableAsync(); 
       } 
       else 
       { 
        // we get here in three situations: 
        // 1. send was completed succesfully 
        // 2. send failed 
        // 3. input has completed 
        // in cases 2 and 3, this await is necessary 
        // in case 1, it's harmless 
        await sendTask; 

        break; 
       } 
      } 
     } 
    } 
    catch (Exception e) 
    { 
     source.Fault(e); 
     target.Fault(e); 
    } 
} 

/// <summary> 
/// Returns a Task that completes when the given Task completes. 
/// The Result is true if the Task was cancelled, 
/// and false if it completed successfully. 
/// If the Task was faulted, the returned Task is faulted too. 
/// </summary> 
public static Task<bool> EnsureCancelled(this Task task) 
{ 
    return task.ContinueWith(t => 
    { 
     if (t.IsCanceled) 
      return true; 
     if (t.IsFaulted) 
     { 
      // rethrow the exception 
      ExceptionDispatchInfo.Capture(task.Exception.InnerException) 
       .Throw(); 
     } 

     // completed successfully 
     return false; 
    }); 
} 

public static void ReceiveAllInto<T>(
    this IReceivableSourceBlock<T> source, List<T> targetCollection) 
{ 
    // TryReceiveAll would be best suited for this, except it's bugged 
    // (see http://connect.microsoft.com/VisualStudio/feedback/details/785185) 
    T item; 
    while (source.TryReceive(out item)) 
     targetCollection.Add(item); 
} 
0

你也可以使用Timer;這將每10秒觸發一次批次