0

我想實現加蓋和緩存作業執行程序。執行加蓋和緩存作業執行程序

這將有一個單獨的方法:

public class CappedBufferedExecutor { 
    public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec); 
    public Task<bool> EnqueueAsync(string val); 
} 

的想法是,值被異步排隊,一旦fillTimeInMillisec毫秒通過,或緩衝器被填充到其的獨特值上限,則執行是在實踐中完成,異步任務全部完成。在執行完成(可能需要很長時間)的同時,可以重新填充緩衝區並執行新的異步執行。

我在下面的僞代碼

  • 使用Timer的線條想到了什麼,等待fillTime通過,一旦超過,創建一個新的任務,那將做的工作(見下文) 。
  • 在新值上鎖定rwlock以供閱讀。檢查緩衝區是否已滿,如果是,請等待ManualResetEventTaskCompletionSource
  • 將新值添加到緩衝區(HashSet<string>)。
  • 如果緩衝區已滿,則創建一個新的執行任務,該任務將鎖定rwlock進行寫入,完成所有收集的值的工作並使用TaskCompletionSource喚醒所有未完成的任務。
  • TaskCompletionSource上等待緩衝的任務(在上一步中提到)執行。

我的問題:如何將Timer和填充的緩衝區檢查同步,如何等待,當緩衝區滿時,如何開始執行,並允許新值到達時TaskCompletionSource實例之間進行切換。

+0

爲什麼你需要返回任務''從'EnqueueAsync'?當緩衝區已滿時,它是否可以提供帶有任務列表的事件? – apocalypse

+0

如果在完成項目之前仍然正在處理項目並且緩衝區已滿,會發生什麼情況? –

+0

@apocalypse bool將表明工作是否成功。不重要,可以走你的路。 – Mugen

回答

2

這僅僅是概念,所以不要指望太多:-)

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace ConsoleApp 
{ 
    class Program 
    { 
     static void Main (string[] args) 
     { 
      var buffer = CreateBuffer(); 

      var executor = new Executor<string> (SomeWork, buffer); 
      executor.ProcessingStarted += Executor_ProcessingStarted; 

      string userInput = null; 

      do 
      { 
       userInput = Console.ReadLine(); 

       buffer.Enqueue (userInput); 
      } 
      while (!string.IsNullOrWhiteSpace (userInput)); 

      executor.Dispose(); 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     private static IBuffer<string> CreateBuffer() 
     { 
      var buffer = new UniqueItemsBuffer<string> (3); 

      buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised."); 

      var alert = new Alert(); 

      var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5)); 

      return bufferWithTimeout; 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     static Random rnd = new Random(); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks 

     public static bool SomeWork (string x) 
     { 
      int delay = rnd.Next (1000, 8000); 

      Console.WriteLine ($" +++ Starting SomeWork for: {x}, delay: {delay} ms"); 

      Thread.Sleep (delay); 

      Console.WriteLine ($" --- SomeWork for: {x} - finished."); 

      return true; 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items) 
     { 
      Task.Run (() => 
      { 
       Task.WaitAll (items.ToArray()); 
       Console.WriteLine ("Finished processing tasks, count = " + items.Count); 
      }); 
     } 
    } 

    //====== actual code =================================================================================================================== 

    public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code 

    public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success 

    //====================================================================================================================================== 

    public interface IDataAvailableEvent<T> 
    { 
     event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared) 
    } 

    //====================================================================================================================================== 

    public interface IProcessingStartedEvent<T> 
    { 
     event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started 
    } 

    //====================================================================================================================================== 

    public interface IBuffer<T> : IDataAvailableEvent<T> 
    { 
     bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list) 
           // returns: true = buffer is not empty, false = is emtpy 

     void FlushBuffer(); // should clear buffer and raise event (or not raise if buffer was already empty) 
    } 

    //====================================================================================================================================== 

    // raises DataAvailable event when buffer cap is reached 
    // ignores duplicates 

    // you can only use this class from one thread 

    public class UniqueItemsBuffer<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly int capacity; 
     HashSet<T> items = new HashSet<T>(); 

     public UniqueItemsBuffer (int capacity = 10) 
     { 
      this.capacity = capacity; 
     } 

     public bool Enqueue (T item) 
     { 
      if (items.Add (item) && items.Count == capacity) 
      { 
       FlushBuffer(); 
      } 

      return items.Count > 0; 
     } 

     public void FlushBuffer() 
     { 
      Console.WriteLine ("BUFFER :: flush, item count = " + items.Count); 

      if (items.Count > 0) 
      { 
       var itemsCopy = items.ToList(); 
       items.Clear(); 

       DataAvailable?.Invoke (itemsCopy); 
      } 
     } 
    } 

    //====================================================================================================================================== 

    public class Executor<T> : IProcessingStartedEvent<T>, IDisposable 
    { 
     public event ItemsAvailable<Task<bool>> ProcessingStarted; 

     readonly ProcessItem<T> work; 
     readonly IDataAvailableEvent<T> dataEvent; 

     public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent) 
     { 
      this.work = work; 
      this.dataEvent = dataEvent; 

      dataEvent.DataAvailable += DataEvent_DataAvailable; 
     } 

     private void DataEvent_DataAvailable (IReadOnlyList<T> items) 
     { 
      Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count); 

      var list = new List<Task<bool>>(); 

      foreach (var item in items) 
      { 
       var task = Task.Run (() => work (item)); 

       list.Add (task); 
      } 

      Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)"); 

      ProcessingStarted?.Invoke (list); 
     } 

     public void Dispose() 
     { 
      dataEvent.DataAvailable -= DataEvent_DataAvailable; 
     } 
    } 

    //====================================================================================================================================== 

    // if you want to fill buffer using many threads - use this decorator 

    public sealed class ThreadSafeBuffer<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly IBuffer<T> target; 
     readonly object sync = new object(); 

     private ThreadSafeBuffer (IBuffer<T> target) 
     { 
      this.target = target; 
      this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P 
     } 

     public bool Enqueue (T item) 
     { 
      lock (sync) return target.Enqueue (item); 
     } 

     public void FlushBuffer() 
     { 
      lock (sync) target.FlushBuffer(); 
     } 

     public static IBuffer<T> MakeThreadSafe (IBuffer<T> target) 
     { 
      if (target is ThreadSafeBuffer<T>) return target; 

      return new ThreadSafeBuffer<T> (target); 
     } 
    } 

    //====================================================================================================================================== 

    // and now if you want to process buffer after elapsed time 

    public interface IAlert 
    { 
     CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking) 
    } 

    // I didn't use much timers, so idk is this code good 

    public class Alert : IAlert 
    { 
     List<System.Timers.Timer> timers = new List<System.Timers.Timer>(); // we need to keep reference to timer to avoid dispose 

     public CancellationTokenSource CreateAlert (TimeSpan delay, Action action) 
     { 
      var cts = new CancellationTokenSource(); 

      var timer = new System.Timers.Timer (delay.TotalMilliseconds); 
      timers.Add (timer); 

      timer.Elapsed += (sender, e) => 
      { 
       timers.Remove (timer); 

       timer.Dispose(); 

       if (cts.Token.IsCancellationRequested) return; 

       action.Invoke(); 
      }; 

      timer.AutoReset = false; // just one tick 
      timer.Enabled = true; 

      return cts; 
     } 
    } 

    // thread safe (maybe :-D) 

    public class BufferWithTimeout<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly IBuffer<T> target; 
     readonly IAlert  alert; 
     readonly TimeSpan timeout; 

     CancellationTokenSource cts; 

     readonly object sync = new object(); 

     public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout) 
     { 
      this.target = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread 
      this.alert = alert; 
      this.timeout = timeout; 

      target.DataAvailable += Target_DataAvailable; // TODO: unpin event 
     } 

     private void Target_DataAvailable (IReadOnlyList<T> items) 
     { 
      lock (sync) 
      { 
       DisableTimer(); 
      } 

      DataAvailable?.Invoke (items); 
     } 

     public bool Enqueue (T item) 
     { 
      lock (sync) 
      { 
       bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer) 

       // and now if buffer is empty, we cannot start timer 

       if (hasItems && cts == null) // if timer is not enabled 
       { 
        Console.WriteLine ("TIMER :: created alert"); 
        cts = alert.CreateAlert (timeout, HandleAlert); 
       } 

       return hasItems; 
      } 
     } 

     public void FlushBuffer() 
     { 
      lock (sync) 
      { 
       DisableTimer(); 
       target.FlushBuffer(); 
      } 
     } 

     private void HandleAlert() 
     { 
      lock (sync) 
      { 
       Console.WriteLine ("TIMER :: handler, will call buffer flush"); 
       target.FlushBuffer(); 
      } 
     } 

     private void DisableTimer() 
     { 
      cts?.Cancel(); 
      cts = null; 

      Console.WriteLine ("TIMER :: disable"); 
     } 
    } 
} 
+0

謝謝,我會盡力爲我的代碼工作!小注:'System.Timers.Timer'不需要你維護一個引用 - 它由框架維護:) – Mugen

+0

@Mugen:啊...真的。感謝您的提示。看起來他們在Net 2.0中修復了它。 – apocalypse

1

您可以使用Reactive Extensions輕鬆做些事情。

void Main() 
{ 
    var c = new Processor(); 
    c.SetupBufferedProcessor(2, TimeSpan.FromMilliseconds(1000)); 

    c.Enqueue("A"); 
    c.Enqueue("B"); 
    c.Enqueue("C"); 

    Console.ReadLine(); 

    // When application has ended, flush the buffer 
    c.Dispose(); 
} 


public sealed class Processor : IDisposable 
{ 
    private IDisposable subscription; 
    private Subject<string> subject = new Subject<string>(); 

    public void Enqueue(string item) 
    { 
     subject.OnNext(item);  
    } 

    public void SetupBufferedProcessor(int bufferSize, TimeSpan bufferCloseTimespan) 
    { 
     // Create a subscription that will produce a set of strings every second 
     // or when buffer has 2 items, whatever comes first 
     subscription = subject.AsObservable() 
      .Buffer(bufferCloseTimespan, bufferSize) 
      .Where(list => list.Any()) // suppress empty list (no items enqueued for 1 second) 
      .Subscribe(async list => 
      { 
       await Task.Run(() => 
       { 
        Console.WriteLine(string.Join(",", list)); 
        Thread.Sleep(2000); // For demo purposes, to demonstrate processing takes place parallel with other batches. 
       }); 
      }); 
    } 

    public void Dispose() 
    { 
     subscription?.Dispose(); 
    } 
} 

這將輸出

A,B 

和,在一秒之後,

C 

爲RX的代碼是更多at GitHub 上RX:http://www.introtorx.com/使用Buffer方法一個基本的例子

這個例子可以改進以保存對創建的對象的引用,以便在結束應用程序之前可以正確等待它們,但這會給你一個總體思路。