3

給出在C#,TPL,並行擴展,異步CTP,反應擴展中執行異步操作的衆多新方法我在想什麼最簡單的方法來並行化獲取和處理部分的以下將是:如何爲foreach創建並行預取

foreach(string url in urls) 
{ 
    var file = FetchFile(url); 
    ProcessFile(file); 
} 

的條件是,雖然文件可在任何時候提取ProcessFile只能同時處理一個文件,應按順序調用。

簡而言之,獲得FetchFileProcessFile以流水線方式表現的行爲,即同時發生的最簡單方法是什麼?

回答

1

這裏的RX方式。這個擴展將改變的URI的蒸汽進入流的數據流:

public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
    TimeSpan timeout) 
    { 
     return 
      from wc in source.Select(WebRequest.Create) 
      from s in Observable 
       .FromAsyncPattern<WebResponse>(wc.BeginGetResponse, 
        wc.EndGetResponse)() 
       .Timeout(timeout, Observable.Empty<WebResponse>()) 
       .Catch(Observable.Empty<WebResponse>()) 
      select s.GetResponseStream(); 
    } 

用法:

new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" } 
    .ToObservable() 
    .RequestToStream(TimeSpan.FromSeconds(5)) 
    .Do(stream = > ProcessStream(stream)) 
    .Subscribe(); 

編輯:哎呀,有沒有注意到文件寫入序列化要求。這部分可以通過採用.Concat來完成這實質上是一個RX隊列(另一個名爲.zip)

讓我們.StreamToFile擴展:

public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source) 
    { 
     return Observable.Defer(() => 
      source.Item1.AsyncRead().WriteTo(File.Create(source.Item2))); 
    } 

現在你可以擁有Web請求平行的,但序列文件寫來自他們:

 new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" } 
      .ToObservable() 
      .RequestToStream(TimeSpan.FromSeconds(5)) 
      .Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat")) 
      .Select(x => x.StreamToFile()) 
      .Concat() 
      .Subscribe(); 
+0

這並不能保證一次只在一個線程上調用ProcessStream – 2011-03-29 07:32:19

+0

@ paul-betts現在怎麼樣? – 2011-03-29 16:04:43

1

鑑於ProcessFile的限制,我會說你應該使用TPL異步獲取數據,然後將引用預加載數據的令牌排入隊列。然後您可以擁有一個後臺線程,將項目從隊列中取出並逐個遞交給ProcessFile。這是一個producer/consumer模式。

對於隊列,你可以看看BlockingCollection,它可以提供一個線程安全隊列,它也具有可以節制工作量的好效果。

1

因爲我不知道所有花哨的機制,我可能會做舊時尚的方式,但我懷疑它會爲「簡單」的分類:

var q = new Queue<MyFile>(); 
var ev = new ManualResetEvent(false); 

new System.Threading.Thread(() => 
{ 
    while (true) 
    { 
     ev.WaitOne(); 
     MyFile item; 
     lock (q) 
     { 
      item = q.Dequeue(); 
      if (q.Count == 0) 
       ev.Reset(); 
     } 
     if (item == null) 
      break; 
     ProcessFile(item); 
    } 
}).Start(); 
foreach(string url in urls) 
{ 
    var file = FetchFile(url); 
    lock (q) 
    { 
     q.Enqueue(file); 
     ev.Set(); 
    } 
} 
lock (q) 
{ 
    q.Enqueue(null); 
    ev.Set(); 
} 
1

異步實際上並不表示並行。這僅僅意味着你不會阻止等待另一個操作。但是你你利用異步I/O爲你下載的網址,不阻塞線程的,也就是說,如果你這樣做,你並不需要儘可能多線程的網址並行下載它們:

var client = new WebClient(); 
var syncLock = new object(); 
TaskEx.WhenAll(urls.Select(url => { 
    client.DownloadDataTaskAsync(url).ContinueWith((t) => { 
    lock(syncLock) { 
     ProcessFile(t.Result); 
    } 
    }); 
})); 

基本上我們爲每個網址創建一個異步下載任務,然後當任何任務完成時,我們調用一個使用普通對象的延續,以確保ProcessFile順序發生。 WhenAll將不會返回,直到最後的ProcessFile延續完成。

你可能避免與RX的ReplaySubject(當然,它會在內部鎖定)顯式鎖:

var pipeline = new ReplaySubject<byte[]>(); 
var files = pipeline.ToEnumerable(); 
var client = new WebClient(); 
TaskEx.WhenAll(urls 
     .Select(download => client.DownloadDataTaskAsync((string) download) 
      .ContinueWith(t => pipeline.OnNext(t.Result)) 
     ) 
    ).ContinueWith(task => pipeline.OnCompleted(task)); 
foreach(var file in files) { 
    ProcessFile(file); 
} 

在這裏我們使用一個ReplaySubject作爲我們的文件下載的管道。每個下載都異步完成,並將其結果發佈到foreach阻塞的流水線上(即按順序發生)。當所有任務完成後,我們完成觀察結果,它將退出foreach