2017-09-05 65 views
2

每個項目假設我有這樣的代碼:的Rx - 消費新線程

static void Main(string[] args) 
    { 
     var scheduler = NewThreadScheduler.Default; 
     var enumerable = Enumerable.Range(0, 100); 

     enumerable 
      .ToObservable(scheduler) 
      .SubscribeOn(scheduler) 
      .Subscribe(item => 
      { 
       Console.WriteLine("Consuming {0} on Thread: {1}", item, Thread.CurrentThread.ManagedThreadId); 

       // simulate long running operation 
       Thread.Sleep(1000); 
      }); 

     Console.ReadKey(); 
    } 

正如你我轉換了IEnumerable到的IObservable。然後我想消耗新線程中的每個項目,所以我使用SubsribeOn(調度程序)。不幸的是,每個迭代在同一個線程上工作,因此一個迭代會在下一個塊中運行

結果是:

Consuming 0 on Thread: 4 
Consuming 1 on Thread: 4 
Consuming 2 on Thread: 4 
Consuming 3 on Thread: 4 
Consuming 4 on Thread: 4 
.... 

是有可能迫使這種行爲?

+0

不應該是'ObserveOn'而不是'SubscribeOn'嗎? – Dirk

+0

我已經嘗試過ObserveOn,SubscribeOn和兩個在同一時間沒有成功。 – Lukas

+0

我不想在新線程上產生,但我想要的是消耗新線程上的每個項目。 – Lukas

回答

3

你看到的行爲完全是由設計。

Rx的基本原理是它的語法,它聲明一個流被定義爲一系列零個或多個OnNext調用,然後是可選的OnErrorOnCompleted

具體而言,Rx語法規定對於給定訂戶,這些消息中的每一個按順序遞送

所以你看到的是正確的行爲 - 沒有併發執行OnNext處理程序。鑑於這種故意的限制,爲每個OnNext創建一個新線程將是相當浪費的。

如果您追蹤代碼的次數足夠多,您會發現NewThreadScheduler專門利用EventLoopScheduler來重新使用每個訂戶的線程。綽號NewThreadScheduler確實說明每個訂戶獲得新線程,而不是每個事件。

要查看此信息,請修改您的代碼,以便我們有兩個訂閱者以不同的速度運行。你會看到每一個都有自己的線程,並進入它的自己的節奏和速度越快由暢通較慢:

var scheduler = NewThreadScheduler.Default; 
var enumerable = Enumerable.Range(0, 100); 

var xs = enumerable 
    .ToObservable(scheduler) 
    .SubscribeOn(scheduler); 

xs.Subscribe(item => 
{ 
    Console.WriteLine("Slow consuming {0} on Thread: {1}", 
     item, Thread.CurrentThread.ManagedThreadId); 

    // simulate slower long running operation 
    Thread.Sleep(1000); 
}); 

xs.Subscribe(item => 
{ 
    Console.WriteLine("Fast consuming {0} on Thread: {1}", 
     item, Thread.CurrentThread.ManagedThreadId); 

    // simulate faster long running operation 
    Thread.Sleep(500); 
}); 

Console.ReadKey(); 

您可以通過找到一個讀的Rx Design Guidelines是非常有幫助的。

允許在訂戶中同時處理事件的願望暗示了一個擁有多個消費者的隊列可能就是您以後的目標 - 並且您可能會看到Rx之外,例如BCL ConcurrentQueue<T>。也可以將消息投影到異步調用中,並在完成時收集結果,而不違反Rx語法約束。

例如以下是一些類似的代碼,可以隨意處理流中每個數字不同的時間長度。你可以看到結果出現亂序,彼此不受阻礙。這不是很棒的代碼,但它很重要。如果異步工作是IO綁定的,它可能是真正有用的。還要注意使用Observable.Range,它避免使用Enumerable.Range().ToObservable()組合。在.NET Core 2.0上測試:

var random = new Random(); 

// stop the threadpool from throttling us as it grows 
ThreadPool.SetMinThreads(100, 1); 

Observable.Range(0, 100) 
.SelectMany(x => Observable.Start(() => 
{ 
    Console.WriteLine($"Started {x}"); 
    Thread.Sleep(random.Next(1, 10) * 1000); 
    return x; 
})) 
.Subscribe(item => 
{ 
    Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}"); 
}); 

Console.ReadKey();