2013-12-11 209 views
0

我的目標是創建一個產生初始任務數量(完全相同)的系統。這些任務將抓取並執行一些後臺操作(從數據庫),然後返回,此時應該生成一個新任務並執行相同的操作。異步任務按順序執行?

我已經寫了下面的代碼作爲概念證明,但它似乎像我所有的任務正在執行1乘1,而不是並行。

代碼:

public Form1() 
    { 
     InitializeComponent(); 
    } 
    CancellationTokenSource cts; 
    private async void button1_Click(object sender, EventArgs e) 
    { 
     cts = new CancellationTokenSource(); 
     int reqNumberOfThreads = int.Parse(textBox1.Text); 
     try 
     { 
      await startSlaves(cts.Token, reqNumberOfThreads); 
     } 
     catch (OperationCanceledException) 
     { 
      MessageBox.Show("Canceled Starting Threads"); 
     } 
     cts = null; 

    } 
    async Task startSlaves(CancellationToken ct, int threadNum) 
    { 

     List<Task<int>> allTasks = new List<Task<int>>();// ***Add a loop to process the tasks one at a time until none remain. 
     for (int x = 0; x < threadNum; x++) 
     { 
      allTasks.Add(beginSlaveOperation(ct, x)); 
     } 
     // ***Add a loop to process the tasks one at a time until none remain. 
     while (allTasks.Count <= threadNum) 
     { 
      // Identify the first task that completes. 
      Task<int> output = await Task.WhenAny(allTasks); 
      allTasks.Remove(output); 
      allTasks.Add(beginSlaveOperation(ct, output.Result)); 
     } 
    } 
    public void performExampleImportOperationThread(int inputVal, int whoAmI) 
    { 
     System.Threading.Thread.Sleep(inputVal*10); 
     System.Console.Write("Thread number" + whoAmI.ToString() + "has finished after "+inputVal.ToString()+" secs \n"); 
    } 
    async Task<int> beginSlaveOperation(CancellationToken ct, int whoAmI) 
    { 
     Random random = new Random(); 
     int randomNumber = random.Next(0, 100);//Get command from microSched and remove it from sched 
     performExampleImportOperationThread(randomNumber, whoAmI);//perform operation 
     return whoAmI; 
    } 

    private void button2_Click(object sender, EventArgs e) 
    { 
     if (cts != null) 
     { 
      cts.Cancel(); 
     } 
    } 

輸出:

Thread number0has finished after 29 secs 
Thread number1has finished after 45 secs 
Thread number2has finished after 59 secs 
Thread number0has finished after 39 secs 
Thread number1has finished after 13 secs 
Thread number2has finished after 44 secs 
Thread number0has finished after 21 secs 
Thread number1has finished after 62 secs 
Thread number2has finished after 62 secs 
Thread number0has finished after 25 secs 
Thread number1has finished after 86 secs 
Thread number2has finished after 10 secs 
Thread number0has finished after 4 secs 
Thread number1has finished after 24 secs 
Thread number2has finished after 84 secs 
Thread number0has finished after 73 secs 
Thread number1has finished after 19 secs 
Thread number2has finished after 72 secs 
Thread number0has finished after 82 secs
+3

爲什麼downvotes? – paqogomez

回答

1

beginSlaveOperation被標記爲async,和你怎麼稱呼它,彷彿它是異步的,但你從來沒有真正await任何東西,所以它是怎麼回事同步運行。 (它使呼叫者同步運行,呼叫者同步運行,等等,以便您的整個應用程序同步運行。)

標記方法async不會奇蹟般地使其在新線程中運行。它所做的就是讓您使用await關鍵字。如果你沒有使用它,你所做的就是同步執行一堆代碼並將結果包裝在一個完成的任務中。你甚至應該得到一個編譯器警告這樣做。

可以修復通過具有performExampleImportOperationThreadasync方法和,而不是使用Thread.Sleep(...),使用await Task.Delay(...)。等待performExampleImportOperationThread使beginSlaveOperation實際上是異步的。

或者您可能無法完成您正在執行的任何操作並將其全部替換爲Parallel.For,這可以設置MaxDegreesOfParallelism將您限制爲特定數量的併發線程。

+0

感謝您的回答,第一部分解決了我的問題。至於第二部分,我認爲並不適合我目前的需求。我並沒有通過一組預定義的數據循環,而是設置了一些後臺線程來檢查連續工作(通過數據庫或其他數據源)。 – Ramsey

+0

@Ramsey您沒有要迭代的一組數據並不重要。您可以使用'Parallel.For'來運行一段代碼'n'次,就像您可以使用常規'for'循環運行一段代碼'n'次一樣,即使您不'有一個數組來迭代。 – Servy

+0

我明白你在說什麼,但是我需要將N設置爲無窮大,因爲我希望這些任務總能隨時循環以檢查要做的工作。我的目標是創建一個應用程序,讓我可以將它放在新機器上,輕鬆使其成爲運行作業的節點。 實際上,performExampleImportOperationThread()將是幾種運行方法之一,具體取決於從數據庫中分配任務的子任務。 – Ramsey

0

這裏以供將來參考固定的代碼:

public Form1() 
    { 
     InitializeComponent(); 
    } 
    CancellationTokenSource cts; 
    private async void button1_Click(object sender, EventArgs e) 
    { 
     cts = new CancellationTokenSource(); 
     int reqNumberOfThreads = int.Parse(textBox1.Text); 
     try 
     { 
      await startSlaves(cts.Token, reqNumberOfThreads); 
     } 
     catch (OperationCanceledException) 
     { 
      MessageBox.Show("Canceled Starting Threads"); 
     } 
     cts = null; 

    } 
    async Task startSlaves(CancellationToken ct, int threadNum) 
    { 

     List<Task<int>> allTasks = new List<Task<int>>();// ***Add a loop to process the tasks one at a time until none remain. 
     for (int x = 0; x < threadNum; x++) 
     { 
      allTasks.Add(beginSlaveOperation(ct, x)); 
     } 
     // ***Add a loop to process the tasks one at a time until none remain. 
     while (allTasks.Count <= threadNum) 
     { 
      // Identify the first task that completes. 
      Task<int> output = await Task.WhenAny(allTasks); 
      allTasks.Remove(output); 
      allTasks.Add(beginSlaveOperation(ct, output.Result)); 
     } 
    } 
    public async Task performExampleImportOperationThread(int inputVal, int whoAmI) 
    { 
     await Task.Delay(inputVal*100); 
     System.Console.Write("Thread number" + whoAmI.ToString() + "has finished after "+inputVal.ToString()+" secs \n"); 
    } 
    async Task<int> beginSlaveOperation(CancellationToken ct, int whoAmI) 
    { 
     Random random = new Random(); 
     int randomNumber = random.Next(0, 100);//Get command from microSched and remove it from sched 
     await performExampleImportOperationThread(randomNumber, whoAmI);//perform operation 
     return whoAmI; 
    } 

    private void button2_Click(object sender, EventArgs e) 
    { 
     if (cts != null) 
     { 
      cts.Cancel(); 
     } 
    }