2012-06-27 51 views
0

我有以下異步隊列處理路由。無功擴展等待隊列空

 var commandQueue = new BlockingCollection<MyCommand>(); 
     commandQueue 
      .GetConsumingEnumerable() 
      .ToObservable(new LimitedConcurrencyLevelTaskPoolScheduler(5)) 
      .Subscribe(c => 
          { 
           try 
           { 
            ProcessCommand(c); 
           } 
           catch (Exception ex) 
           { 
            Trace.TraceError(ex.ToString()); 
           } 
          } 
      ); 

在一個特定的情況下(當我即將得到一些數據),我需要確保我commandQueue是前走出去,讓數據爲空。預計此操作將同步發生。基本上,我想這樣做

public void GetData() 
    { 
    commandQueue.WaitForEmpty(); 

    // could potentially be expressed: 
    // while (commandQueue.Count > 0) Thread.Sleep(10); 

    return GoGetTheData() 
    } 

我意識到,在理想情況下,所有的來電者「的GetData」異步......但有時它的必要,它發生在一個同步的方式...等我需要等待命令隊列爲空以確保數據的一致性和最新性。

我知道我如何使用ManualResetEvent輕鬆完成此操作...但我想知道System.Reactive/TPL是否有簡單的方法。

謝謝。

回答

1

這是一個比起初看起來更難的問題。你需要BlockingCollection(和底層的ConcurrentQueue)生產者 - 消費者的工作語義。但是你也希望能夠觀察這些收藏品的情況,包括等待'空'的信號。

最好的辦法就是從這裏看一看JobQueueParallelJobQueue

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7

其中包括一個可觀察的WhenQueueEmpty並可以控制同時運行的作業數量和排隊的作業(工種是同義這種情況下你的命令的概念)。

0

你能用這個嗎?

var dataObservable = Observable.Start(() => 
    { 
     commandQueue.WaitForEmpty(); 
     return GoGetTheData(); 
    }); 
+0

問題是,沒有這樣的方法WaitForEmpty – Jeff

+0

@ JeffN825 - 什麼是BlockCollection呢?這是你定義的東西嗎? – Enigmativity

+0

對不起,我不確定在我的例子中如何被截斷。它是BlockingCollection http://msdn.microsoft.com/en-us/library/dd267312.aspx – Jeff

0

在我看來,你的要求是

  • 異步此數據獲取數據
  • 並行處理
  • 重複此過程

如果(5度最大並行)這些都是你的要求,你不需要使用BlockingCollection,即它不是現有的API,那麼我認爲你可以很容易地用Rx解決這個問題。

var dataRequestScheduler = new EventLoopScheduler(); 
var subscription = GetTheData() 
    .Repeat() 
    .SubscribeOn(dataRequestScheduler) 
    .ObserveOn(Scheduler.TaskPool)//new LimitedConcurrencyLevelTaskPoolScheduler(5) 
    .Subscribe(c => 
      { 
       try 
       { 
        ProcessCommand(c); 
       } 
       catch (Exception ex) 
       { 
        Trace.TraceError(ex.ToString()); 
       } 
      } 
     ); 

凡GetTheData方法返回的IObservable

你可能會利用Observable.Start和合並(5),以獲得你的最大5個線程,而無需自定義調度。