2012-07-02 67 views
2

我想實現一個隊列,它能夠從多個線程中的多個生產者獲取事件/項目,並在單個線程中使用它們。這個隊列將在一些關鍵環境中工作,所以我非常關心它的穩定性。Rx隊列實現和調度程序緩衝區

我一直在使用的Rx能力來實現它,但我有2個問題:

  1. 這是實現好不好?或者它可能有些缺陷,我不知道? (作爲替代 - 帶有隊列和鎖的手動實現)
  2. 什麼是分派器的緩衝區長度?它可以處理100k個排隊物品嗎?

下面的代碼說明了我的方法,使用一個簡單的TestMethod。它的輸出顯示所有值都來自不同的線程,但在另一個線程上處理。

[TestMethod()] 
public void RxTest() 
{ 
    Subject<string> queue = new Subject<string>(); 

    queue 
     .ObserveOnDispatcher() 
     .Subscribe(s => 
         { 
          Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId); 
         }, 
        () => Dispatcher.CurrentDispatcher.InvokeShutdown()); 

    for (int j = 0; j < 10; j++) 
    { 
     ThreadPool.QueueUserWorkItem(o => 
     { 
      for (int i = 0; i < 100; i++) 
      { 
       Thread.Sleep(10); 
       queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId)); 
      } 
      queue.OnCompleted(); 
     }); 
    } 


    Dispatcher.Run(); 
} 

回答

3

看看EventLoopScheduler。它內置於RX,我認爲它可以做你想做的一切。

您可以採取任何數量可觀的,叫.ObserveOn(els)els是你的一個EventLoopScheduler的實例)和你現在編組從多個線程多個可觀測到一個線程,每個呼叫排隊OnNext順序。

+0

EventLoopScheduler可能是我的選擇,但可以在當前線程上「運行」該調度程序嗎?在調度員的運行阻塞方式? – deafsheep

+0

@deafsheep EventLoopScheduler在每個項目的不同線程上連續執行工作。 CurrentThreadScheduler在當前線程上執行代碼(因爲一次線程只能執行一件事,所以它會自動地串行)。如果您使用錯誤,它也會產生比您可以輸入更快的死鎖情況。對於你所描述的情況,vanilla EventLoopScheduler可能是一種可行的方法。 –

+0

@AndersonImes - 如果沒有掛起項目,'EventLoopScheduler'只使用一個新線程。如果排隊的號碼繼續使用相同的線程。 – Enigmativity

3

我不確定Subject在嚴重多線程場景中的行爲。我可以想象,儘管如此,BlockingCollection(及其基礎ConcurrentQueue)在你談論的情況下已經磨損很多。並且很容易啓動。

var queue = new BlockingCollection<long>(); 

// subscribing 
queue.GetConsumingEnumerable() 
    .ToObservable(Scheduler.NewThread) 
    .Subscribe(i => Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId)); 

// sending 
Observable.Interval(TimeSpan.FromMilliseconds(500), Scheduler.ThreadPool) 
      .Do(i => Debug.WriteLine("Value: {0}, Sent on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId)) 
      .Subscribe(i => queue.Add(i)); 

你當然不想碰到隊列和鎖。 ConcurrentQueue實現非常出色,並且一定會處理您正在討論的大小隊列。

+0

非常感謝你,你的例子非常好,並且完全按照我的需要工作,而不會搞亂Dispatcher。 – deafsheep

+0

雖然我認爲你的實現存在問題 - 如果這個隊列在一個月內有效,該怎麼辦?它會將所有物品存儲在它的權利?這意味着它會消耗大量的記憶來保存這一切? – deafsheep

+0

@deafsheep項目正在從隊列中流失,因此不會將它們全部存儲,除非它們沒有被佔用,這是您想要的行爲。 – yamen