2011-09-08 109 views
2

據我所知,訂閱方法應該是異步的,而運行是同步的。但是這段代碼是以同步方式工作的。任何人都可以修復它嗎?爲什麼不是這個代碼表現異步

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Reactive.Linq; 

namespace RxExtensionsDemo 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      IObservable<int> source = Observable.Generate<int, int>(0, i => i < 10000, i => i + 1, i => i * i); 

      IDisposable subscription = source.Subscribe(x => { Console.WriteLine("Received {0} from source", x); }, ex => 
      { 
       Console.WriteLine("Error occured"); 

      },() => 
      { 
       Console.WriteLine("Source said there are no more messages to follow"); 
      }); 

      Console.WriteLine("Asynchronous"); 

      Console.ReadKey(); 
     } 
    } 
} 

我總是看到異步寫入控制檯在最後。

+0

我不認爲'Subscribe'始終是異步的。 – CodesInChaos

+0

並且即使它不能保證一個線程在任何特定時間產生 –

+0

我想我所缺少的是指定Scheduler選項:Scheduler.ThreadPool作爲最後一個。指定完成了這項工作:) – Jaggu

回答

3

默認情況下Observable.Generate使用Scheduler.CurrentThread。但是,您可以指定不同的調度,以獲得所需的異步行爲:

IObservable<int> source = Observable.Generate<int, int>(
    0, 
    i => i < 10000, 
    i => i + 1, 
    i => i * i, 
    Scheduler.NewThread 
); 

Scheduler類是在System.Reactive.Concurrency命名空間。

其他可能的異步預定義調度程序是Scheduler.TaskPoolScheduler.ThreadPool