2012-12-01 49 views
5

如何合併基於TPL的任務的List<T>以便以後執行?如何合併兩個Linq IEnumerable <T>查詢而不運行它們?

public async IEnumerable<Task<string>> CreateTasks(){ /* stuff*/ } 

我的假設是.Concat() ......

 void MainTestApp() // Full sample available upon request. 
    { 
     List<string> nothingList = new List<string>(); 
     nothingList.Add("whatever"); 
     cts = new CancellationTokenSource(); 

     delayedExecution = 
      from str in nothingList 
      select AccessTheWebAsync("", cts.Token); 
     delayedExecution2 = 
      from str in nothingList 
      select AccessTheWebAsync("1", cts.Token); 

     delayedExecution = delayedExecution.Concat(delayedExecution2); 
    } 


    /// SNIP 

    async Task AccessTheWebAsync(string nothing, CancellationToken ct) 
    { 
     // return a Task 
    } 

我要確保這不會產卵任何任務或評估任何東西。實際上,我想我問的是「什麼邏輯上執行IQueryable返回數據的東西」?

背景

因爲我在做遞歸,我不希望執行,直到正確的時間,究竟是合併,如果多次調用的結果正確的方法是什麼?

如果它的事項,我想運行此命令來啓動,然後該代碼的所有任務var AllRunningDataTasks = results.ToList();的:

while (AllRunningDataTasks.Count > 0) 
{ 
    // Identify the first task that completes. 
    Task<TableResult> firstFinishedTask = await Task.WhenAny(AllRunningDataTasks); 

    // ***Remove the selected task from the list so that you don't 
    // process it more than once. 
    AllRunningDataTasks.Remove(firstFinishedTask); 

    // TODO: Await the completed task. 
    var taskOfTableResult = await firstFinishedTask; 

    // Todo: (doen't work) 
    TrustState thisState = (TrustState)firstFinishedTask.AsyncState; 

    // TODO: Update the concurrent dictionary with data 
    // thisState.QueryStartPoint + thisState.ThingToSearchFor 

    Interlocked.Decrement(ref thisState.RunningDirectQueries); 
    Interlocked.Increment(ref thisState.CompletedDirectQueries); 

    if (thisState.RunningDirectQueries == 0) 
    { 
     thisState.TimeCompleted = DateTime.UtcNow; 
    } 
} 
+0

爲什麼concat不工作,它應該工作?另外,你不想運行任務,但運行查詢很好,Rght? – Tilak

+0

@Tilak我的工作重點是任務,這是我第一次在任務或查詢中進行這項工作。我從來沒有這樣做過查詢,但我記得讀過Concat是如何完成的。 – LamonteCristo

+0

@Tilak也許我在我的代碼中發現了一個錯誤...將很快更新 – LamonteCristo

回答

0

以下是哈克的方式來獲得數據合併......我不就像我不得不在Main或這個示例的其他幾個方面使用「nothingList」一樣,但似乎完成了這項工作並允許我合併未完成的任務。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading.Tasks; 
using System.Windows; 
using System.Windows.Controls; 
using System.Windows.Data; 
using System.Windows.Documents; 
using System.Windows.Input; 
using System.Windows.Media; 
using System.Windows.Media.Imaging; 
using System.Windows.Navigation; 
using System.Windows.Shapes; 

// Add a using directive and a reference for System.Net.Http. 
using System.Net.Http; 

// Add the following using directive. 
using System.Threading; 


namespace ProcessTasksAsTheyFinish 
{ 
    public partial class MainWindow : Window 
    { 
     // Declare a System.Threading.CancellationTokenSource. 
     CancellationTokenSource cts; 
     List<IEnumerable<Task>> launchList = new List<IEnumerable<Task>>(); 

     public MainWindow() 
     { 
      InitializeComponent(); 

      List<string> nothingList = new List<string>(); 
      nothingList.Add("whatever"); 

      cts = new CancellationTokenSource(); 

      delayedExecution = 
       from str in nothingList 
       select AccessTheWebAsync("", cts.Token); 


      List<string> nothingList2 = new List<string>(); 
      nothingList2.Add("whatever"); 

      delayedExecution2 = 
       from str in nothingList2 
       select AccessTheWebAsync("1", cts.Token); 


      launchList.Add(delayedExecution); 
      launchList.Add(delayedExecution2); 

      delayedExecution = delayedExecution.Concat(delayedExecution2); 
     } 
     IEnumerable<Task> delayedExecution = null; 
     IEnumerable<Task> delayedExecution2 = null; 

     private async void startButton_Click(object sender, RoutedEventArgs e) 
     { 
      resultsTextBox.Clear(); 

      // Instantiate the CancellationTokenSource. 

      try 
      { 
       // ***Set up the CancellationTokenSource to cancel after 25 seconds. 
       //cts.CancelAfter(250000); 

       var test = delayedExecution;// AccessTheWebAsync("", cts.Token); 

       var testList = test.ToList(); 

       while (testList.Count() > 0) 
       { 
        var firstFinishedTask = await Task.WhenAny(testList); 
        testList.Remove(firstFinishedTask); 

         await firstFinishedTask; 
       } 

       resultsTextBox.Text += "\r\nDownloads complete."; 
      } 
      catch (OperationCanceledException tee) 
      { 
       resultsTextBox.Text += "\r\nDownloads canceled.\r\n"; 
      } 
      catch (Exception) 
      { 
       resultsTextBox.Text += "\r\nDownloads failed.\r\n"; 
      } 

      cts = null; 
     } 


     private void cancelButton_Click(object sender, RoutedEventArgs e) 
     { 
      if (cts != null) 
      { 
       cts.Cancel(); 
      } 
     } 


     async Task<string> AccessTheWebAsync(string nothing, CancellationToken ct) 
     { 
      // CHANGE THIS VALUE TO CONTROL THE TESTING 
      bool delayConversionOfQueryToList = false; 

      HttpClient client = new HttpClient(); 

      // Make a list of web addresses. 
      List<string> urlList = null; 

      if (nothing == "1") 
      { 
       urlList = SetUpURLList2(); 
      } 
      else urlList = SetUpURLList(); 

      // ***Create a query that, when executed, returns a collection of tasks. 
      IEnumerable<Task<int>> downloadTasksQuery = 
       from url in urlList select ProcessURL(url, client, ct); 

      // DEBUG!!! 
      if (delayConversionOfQueryToList == true) 
      { 
       await Task.Delay(10000); 
       resultsTextBox.Text += String.Format("\r\nDelay of IQueryable complete. Tip: Did you see any IsRunning messages?"); 
      } 

      // ***Use ToList to execute the query and start the tasks. 
      List<Task<int>> downloadTasks = downloadTasksQuery.ToList(); 

      // DEBUG!!! 
      if (delayConversionOfQueryToList == false) 
      { 
       await Task.Delay(10000); 
       resultsTextBox.Text += String.Format("\r\nDelay of .ToList() complete. Tip: Did you see any IsRunning messages?"); 
      } 

      // ***Add a loop to process the tasks one at a time until none remain. 
      while (downloadTasks.Count() > 0) 
      { 
       // Identify the first task that completes. 
       Task<int> firstFinishedTask = await Task.WhenAny(downloadTasks); 

       resultsTextBox.Text += String.Format("\r\nID {0}", firstFinishedTask.Id); 

       // ***Remove the selected task from the list so that you don't 
       // process it more than once. 
       downloadTasks.Remove(firstFinishedTask); 

       // Await the completed task. 
       int length = await firstFinishedTask; 
       resultsTextBox.Text += String.Format("\r\nLength of the download: {0}", length); 
      } 

      return nothing; 
     } 


     private List<string> SetUpURLList() 
     { 
      List<string> urls = new List<string> 
      { 
       "http://msdn.microsoft.com", 
       "http://msdn.microsoft.com/library/windows/apps/br211380.aspx", 
       "http://msdn.microsoft.com/en-us/library/hh290136.aspx", 
       "http://msdn.microsoft.com/en-us/library/dd470362.aspx", 
       "http://msdn.microsoft.com/en-us/library/aa578028.aspx", 
       "http://msdn.microsoft.com/en-us/library/ms404677.aspx", 
       "http://msdn.microsoft.com/en-us/library/ff730837.aspx" 
      }; 
      return urls; 
     } 
     private List<string> SetUpURLList2() 
     { 
      List<string> urls = new List<string> 
      { 
       "http://www.google.com", 

      }; 
      return urls; 
     } 

     async Task<int> ProcessURL(string url, HttpClient client, CancellationToken ct) 
     { 
      resultsTextBox.Text += String.Format("\r\nIS RUNNING {0}", url); 

      // GetAsync returns a Task<HttpResponseMessage>. 
      HttpResponseMessage response = await client.GetAsync(url, ct); 
      // Retrieve the website contents from the HttpResponseMessage. 
      byte[] urlContents = await response.Content.ReadAsByteArrayAsync(); 

      // Thread.Sleep(3000); 
      // await Task.Delay(1000, ct); 
      return urlContents.Length; 
     } 
    } 
} 

// Sample Output: 

IS RUNNING http://msdn.microsoft.com 
IS RUNNING http://msdn.microsoft.com/library/windows/apps/br211380.aspx 
IS RUNNING http://msdn.microsoft.com/en-us/library/hh290136.aspx 
IS RUNNING http://msdn.microsoft.com/en-us/library/dd470362.aspx 
IS RUNNING http://msdn.microsoft.com/en-us/library/aa578028.aspx 
IS RUNNING http://msdn.microsoft.com/en-us/library/ms404677.aspx 
IS RUNNING http://msdn.microsoft.com/en-us/library/ff730837.aspx 
IS RUNNING http://www.google.com 
Delay of .ToList() complete. Tip: Did you see any IsRunning messages? 
ID 1 
Length of the download: 48933 
ID 2 
Length of the download: 375328 
ID 3 
Length of the download: 220428 
ID 4 
Length of the download: 222256 
ID 5 
Length of the download: 229330 
ID 6 
Length of the download: 136544 
ID 7 
Length of the download: 207171 
Delay of .ToList() complete. Tip: Did you see any IsRunning messages? 
ID 8 
Length of the download: 43945 
Downloads complete. 
2

要回答具體問題「邏輯上執行IQueryable到返回數據的東西」?那將會是強制產生至少一個價值的東西,或者迫使發現是否有價值。

例如,ToListToArrayFirstSingleSingleOrDefault,並且Count將所有力的評價。 (雖然First不會評估整個集合 - 它會檢索第一個項目然後停止。)這些都必須至少開始檢索值,因爲如果沒有這樣做,它們都無法返回它們返回的值。在ToListToArray的情況下,這些返回完全填充的非懶惰集合,這就是爲什麼他們必須評估所有內容。返回單個項目的方法至少需要詢問第一個項目,然後將繼續檢查評估是否繼續(如果結果更多,則拋出異常)。使用foreach遍歷查詢也將強制評估。 (再次,這是出於同樣的原因:你問它的實際值從集合所以它必須爲他們提供。)

Concat不會立即評估,因爲它並不需要 - 這是隻有當你詢問連接序列的值,它需要詢問其輸入的值。

順便說一句,雖然你問了關於IQueryable你沒有在這裏的例子中使用它。這可能很重要,因爲與實際獲得的LINQ to Objects實現(您獲得的原始IEnumerable<T>)相比,它的工作方式存在一些差異。我不認爲它在這個例子中有所作爲,但它讓我懷疑你的原始代碼和你在這裏發佈的版本是否有變化?這很重要,因爲不同的LINQ提供者可以以不同的方式做事。ConcatIEnumerable<T>味道絕對使用延遲評估,雖然我認爲對於大多數其他LINQ實現來說,它並不是絕對給定的。

如果您需要多次使用結果,並且希望確保您只評估一次,但在實際需要之前您不評估它們,那麼通常的方法是在電話號碼處撥打ToList點,你肯定需要評估,然後堅持到結果List<T>,所以你可以再次使用它。一旦獲得了List<T>(或數組)表格中的數據,您可以隨意多次使用該列表。

順便說一句,你的第一個問題,有一些問題:「我如何合併爲以後執行基於TPL的任務列表」

通常,如果您已經有TPL任務,那麼您無法阻止它執行。 (這是一個例外,如果你直接構造一個Task而不是使用其中一種更常規的方式來創建它,它將不會真正運行,直到你告訴它。但是通常,返回任務的API返回活的,也就是說,他們可能已經在運行,或者甚至完成了,直到你把它們放在手上時)。

在你的例子中的「稍後執行」來自事實上你實際上沒有列表首先要完成的任務。 (如果你實際上有一個List<T>的任務,那麼「以後的執行」將不會成爲一個選項。)你所擁有的是一些枚舉類型,如果你要評估它們,它們會創建任務。創建任務的行爲在任何返回任務的TAP樣式API中啓動它的行爲是不可分割的。

基於對你寫的休息,我認爲你真正問的是:

「我如何合併多個在推遲潛在的可枚舉的評價方式IEnumerable<Task<T>>對象爲單一IEnumerable<Task<T>>直到組合的枚舉本身被評估爲止?「

Concat應該爲此工作。

+0

有趣...我使用Azure存儲來獲取按需加載的樹,並且我不想執行子節點直到所有在相同深度的分支都被搜索到了。 Azure存儲使用Begin..End語義。我將它們包裝在TPL中,如http://stackoverflow.com/q/13216475/328397中所述 – LamonteCristo