我想實現一個隊列,它能夠從多個線程中的多個生產者獲取事件/項目,並在單個線程中使用它們。這個隊列將在一些關鍵環境中工作,所以我非常關心它的穩定性。Rx隊列實現和調度程序緩衝區
我一直在使用的Rx能力來實現它,但我有2個問題:
- 這是實現好不好?或者它可能有些缺陷,我不知道? (作爲替代 - 帶有隊列和鎖的手動實現)
- 什麼是分派器的緩衝區長度?它可以處理100k個排隊物品嗎?
下面的代碼說明了我的方法,使用一個簡單的TestMethod。它的輸出顯示所有值都來自不同的線程,但在另一個線程上處理。
[TestMethod()]
public void RxTest()
{
Subject<string> queue = new Subject<string>();
queue
.ObserveOnDispatcher()
.Subscribe(s =>
{
Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId);
},
() => Dispatcher.CurrentDispatcher.InvokeShutdown());
for (int j = 0; j < 10; j++)
{
ThreadPool.QueueUserWorkItem(o =>
{
for (int i = 0; i < 100; i++)
{
Thread.Sleep(10);
queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId));
}
queue.OnCompleted();
});
}
Dispatcher.Run();
}
EventLoopScheduler可能是我的選擇,但可以在當前線程上「運行」該調度程序嗎?在調度員的運行阻塞方式? – deafsheep
@deafsheep EventLoopScheduler在每個項目的不同線程上連續執行工作。 CurrentThreadScheduler在當前線程上執行代碼(因爲一次線程只能執行一件事,所以它會自動地串行)。如果您使用錯誤,它也會產生比您可以輸入更快的死鎖情況。對於你所描述的情況,vanilla EventLoopScheduler可能是一種可行的方法。 –
@AndersonImes - 如果沒有掛起項目,'EventLoopScheduler'只使用一個新線程。如果排隊的號碼繼續使用相同的線程。 – Enigmativity