2016-05-20 66 views
2

我有一個我想要使用HttpClient併發下載的頁面的URL列表。 URL列表可能很大(100以上!)使用Rx和SelectMany限制併發請求

目前我已經有這樣的代碼:

var urls = new List<string> 
      { 
       @"http:\\www.amazon.com", 
       @"http:\\www.bing.com", 
       @"http:\\www.facebook.com", 
       @"http:\\www.twitter.com", 
       @"http:\\www.google.com" 
      }; 

var client = new HttpClient(); 

var contents = urls 
    .ToObservable() 
    .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute))); 

contents.Subscribe(Console.WriteLine); 

問題:由於SelectMany使用,任務的一大束創建幾乎在同一時間。看來,如果URL的列表足夠大,很多任務會給超時(我得到「任務被取消」例外)。

所以,我認爲應該有一種方法,可能使用某種調度程序來限制併發任務的數量,在給定時間不允許超過5或6個任務。

通過這種方式,我可以獲得併發下載,而無需啓動太多可能會失速的任務,就像他們現在所做的那樣。

如何做到這一點,所以我不飽和大量的超時任務?

非常感謝。

+1

你可能要考慮使用[數據流](https://msdn.microsoft.com/en-us/library/hh228603%28v= vs.110%29.aspx)API。 –

+0

你可以使用我的代碼來整合它嗎?我忽略瞭如何使用DataFlow來完成它。 TBH,我從來沒有用過,但看一些樣品會有很大的幫助。 – SuperJMN

回答

10

還記得SelectMany()實際上是Select().Merge()。雖然SelectMany沒有maxConcurrent參數,Merge()的確如此。所以你可以使用它。

從你的例子,你可以這樣做:

var urls = new List<string> 
    { 
     @"http:\\www.amazon.com", 
     @"http:\\www.bing.com", 
     @"http:\\www.facebook.com", 
     @"http:\\www.twitter.com", 
     @"http:\\www.google.com" 
    }; 

var client = new HttpClient(); 

var contents = urls 
    .ToObservable() 
    .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri))) 
    .Merge(2); // 2 maximum concurrent requests! 

contents.Subscribe(Console.WriteLine); 
1

下面是如何你可以用DataFlow API做一個例子:

private static Task DoIt() 
{ 
    var urls = new List<string> 
    { 
     @"http:\\www.amazon.com", 
     @"http:\\www.bing.com", 
     @"http:\\www.facebook.com", 
     @"http:\\www.twitter.com", 
     @"http:\\www.google.com" 
    }; 

    var client = new HttpClient(); 

    //Create a block that takes a URL as input 
    //and produces the download result as output 
    TransformBlock<string,string> downloadBlock = 
     new TransformBlock<string, string>(
      uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)), 
      new ExecutionDataflowBlockOptions 
      { 
       //At most 2 download operation execute at the same time 
       MaxDegreeOfParallelism = 2 
      }); 

    //Create a block that prints out the result 
    ActionBlock<string> doneBlock = 
     new ActionBlock<string>(x => Console.WriteLine(x)); 

    //Link the output of the first block to the input of the second one 
    downloadBlock.LinkTo(
     doneBlock, 
     new DataflowLinkOptions { PropagateCompletion = true}); 

    //input the urls into the first block 
    foreach (var url in urls) 
    { 
     downloadBlock.Post(url); 
    } 

    downloadBlock.Complete(); //Mark completion of input 

    //Allows consumer to wait for the whole operation to complete 
    return doneBlock.Completion; 
} 

static void Main(string[] args) 
{ 
    DoIt().Wait(); 
    Console.WriteLine("Done"); 
    Console.ReadLine(); 
} 
+0

哇。它看起來非常好,但我想知道如何使用Rx做同樣的事情。提前致謝! – SuperJMN

1

你能看到,如果這有助於?

var urls = new List<string> 
     { 
      @"http:\\www.amazon.com", 
      @"http:\\www.bing.com", 
      @"http:\\www.google.com", 
      @"http:\\www.twitter.com", 
      @"http:\\www.google.com" 
     }; 

var contents = 
    urls 
     .ToObservable() 
     .SelectMany(uri => 
      Observable 
       .Using(
        () => new System.Net.Http.HttpClient(), 
        client => 
         client 
          .GetStringAsync(new Uri(uri, UriKind.Absolute)) 
          .ToObservable())); 
+0

對不起,它不能很好地工作。在超時後取消了100個任務:( – SuperJMN

+0

)您可以嘗試使用'EventLoopScheduler'嗎? – Enigmativity

+0

謝謝!我已經嘗試過了,它的表現相同。請看@Dorus的答案,因爲它很簡單並且按預期工作沒有太多的麻煩。 – SuperJMN