這裏的RX方式。這個擴展將改變的URI的蒸汽進入流的數據流:
public static IObservable<Stream> RequestToStream(this IObservable<string> source,
TimeSpan timeout)
{
return
from wc in source.Select(WebRequest.Create)
from s in Observable
.FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
wc.EndGetResponse)()
.Timeout(timeout, Observable.Empty<WebResponse>())
.Catch(Observable.Empty<WebResponse>())
select s.GetResponseStream();
}
用法:
new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Do(stream = > ProcessStream(stream))
.Subscribe();
編輯:哎呀,有沒有注意到文件寫入序列化要求。這部分可以通過採用.Concat來完成這實質上是一個RX隊列(另一個名爲.zip)
讓我們.StreamToFile擴展:
public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
{
return Observable.Defer(() =>
source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
}
現在你可以擁有Web請求平行的,但序列文件寫來自他們:
new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
.Select(x => x.StreamToFile())
.Concat()
.Subscribe();
這並不能保證一次只在一個線程上調用ProcessStream – 2011-03-29 07:32:19
@ paul-betts現在怎麼樣? – 2011-03-29 16:04:43