2015-05-17 129 views
2

我想實現下面的用例。我有一個將監視Azure存儲隊列的Azure工作者角色,並且當消息進入時,這會觸發一個作業異步運行。如果可能,我想使用TPL,並且需要支持取消的操作,以便在Azure角色OnStop觸發時,可以儘可能優雅地退出作業。 Scott Guthrie發佈的MyFixIt示例幾乎正是我需要的,我已將其用作我的項目的模板。不支持的一個關鍵方面是需要異步運行作業。在FixIt代碼中,一旦作業啓動,將不會處理其他作業,直到完成作業爲止。我的應用程序將處理的一些作業長時間運行,並且我需要工作人員角色能夠注意到其他傳入作業,並在長時間運行的作業正在運行時運行這些作業。Azure Worker角色異步處理作業

這裏的兩個關鍵方法是監視隊列的ProcessMessagesAsync和ProcessMessage,它將在消息進入時運行作業。下面是我所擁有的,它主要工作,除非它不能正確處理CancellationRequest,並且Azure工作者角色將關閉而無需等待作業完成。

 /// <summary> 
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved. 
    /// </summary> 
    /// <param name="token"></param> 
    /// <returns></returns> 
    public virtual async Task ProcessMessagesAsync(CancellationToken token) 
    { 
     CloudQueue queue = _queueClient.GetQueueReference(_queueName); 
     await queue.CreateIfNotExistsAsync(token); 

     while (!token.IsCancellationRequested) 
     { 
      Debug.WriteLine("inLoop"); 
      // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages. 
      // Pass in a cancellation token, because the operation can be long-running. 
      CloudQueueMessage message = await queue.GetMessageAsync(token); 
      if (message != null) 
      { 
       ProcessMessage(message, queue, token); 
      } 
      else 
      { 
       await Task.Delay(500, token); 
      } 
     } 
    } 



    protected virtual async Task ProcessMessage(CloudQueueMessage message, CloudQueue queue, CancellationToken token) 
    { 
     var jobDetails = JobDetails.DeserializeJson(message.AsString); 
     var result = await _jobRunner.RunJob(jobDetails, token); 

     //todo handle error 
     //if (result.Status == JobStatus.Error) 

     await queue.DeleteMessageAsync(message); 
    } 

然後JobRunner運行請求的作業。我寫了一個TestJob,其中我試圖模擬一個可以注意到CancellationRequest的長時間運行的作業,並且在一段短的清理週期後,儘早退出該作業。

public virtual async Task<JobResult> RunJob(JobDetails jobDetails, CancellationToken token) 
    { 
     switch (jobDetails.JobName.ToLower()) 
     { 
      case "testjob": 
       return await TestJob(jobDetails.Args, token); 
     } 
     return new JobResult(JobStatus.Error) { ErrorMessage = "The job requested does not exist." }; 
    } 
    protected virtual async Task<JobResult> TestJob(List<string> jobArgs, CancellationToken token) 
    { 
     var message = "no message"; 
     if (jobArgs != null && jobArgs.Any()) 
      message = jobArgs[0]; 

     return await Task.Run(async() => 
     { 
      Debug.WriteLine(string.Format("Start:{0}", message)); 
      for (int i = 1; i <= 800; i++) 
      { 
       if (token.IsCancellationRequested) 
       { 
        Debug.WriteLine("CancelationRequest in TestJob"); 
        //simulate short time to cleanup and exit early 
        Thread.Sleep(1500); 
        Debug.WriteLine("Cancelation Job Cleanup finsihed."); 
        token.ThrowIfCancellationRequested(); 
       } 
       Thread.Sleep(10); 
      } 

      Debug.WriteLine(string.Format("Finish:{0}", message)); 
      return new JobResult(JobStatus.Success); 
     }); 
    } 

我一直在尋找,現在研究2天,包括TPL數據流庫,而且還沒有能夠拿出一個辦法,使這項工作正常。我覺得ProcessMessage(消息,隊列,令牌)調用沒有正確完成,甚至有一個編譯器警告'因爲這個調用沒有等待......'。但是我不想等待(這是FixIt的例子),因爲在運行完成之前,沒有其他工作會被發現。這似乎不是一個不常見的用例,儘管我似乎無法找到任何描述它的人。

非常感謝您的幫助!

丹尼·格林

+0

你真的應該看看WebJobs;他們爲你處理所有這些。 –

回答

0

出現這種情況的原因是因爲你沒有履行任務從ProcessMessage的返回。由於此ProcessMessageAsync可以在ProcessMessage正常完成或取消之前完成。請記住,您不希望等待ProcessMessage,因爲它會使消息處理按順序進行,我建議您保留正在運行的任務的列表。 換句話說,在ProcessMessageAsync中創建一個List,並將從ProcessMessage返回的任務添加到此列表中。然後在while循環結束時,如果令牌被取消,您應該遍歷這個列表以取消所有掛起的任務。

對不起,我沒有VS方便,但我希望你明白了。

+0

謝謝桑傑,我已經實現你的想法併發布了代碼。看起來我還沒有被允許upvote你的答案,這是我第一次在StackOverflow上發佈任何東西(儘管我在這裏發現了無數的答案!) –

0

謝謝桑傑,基於你的建議,我已經拿出了以下內容。

 /// <summary> 
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved. 
    /// </summary> 
    /// <param name="token"></param> 
    /// <returns></returns> 
    public virtual async Task ProcessMessagesAsync(CancellationToken token) 
    { 
     CloudQueue queue = _queueClient.GetQueueReference(_queueName); 
     await queue.CreateIfNotExistsAsync(token); 

     var runningTasks = new ConcurrentDictionary<int, Task>(); 

     while (!token.IsCancellationRequested) 
     { 
      Debug.WriteLine("inLoop"); 
      // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages. 
      // Pass in a cancellation token, because the operation can be long-running. 
      CloudQueueMessage message = await queue.GetMessageAsync(token); 
      if (message != null) 
      { 
       var t = ProcessMessage(message, queue, token); 
       var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks)); 
       while (true) 
       { 
        if (runningTasks.TryAdd(t.Id, t)) 
         break; 
        Task.Delay(25); 
       }          
      }      
      else 
      { 
       try 
       { 
        await Task.Delay(500, token); 
       } 
       catch (Exception ex) 
       { 
        Debug.WriteLine(ex.Message); 
       } 
      } 
     } 
     while (!runningTasks.IsEmpty) 
     { 
      Debug.WriteLine("Waiting for running tasks"); 
      Task.Delay(500); 
     } 

    } 

    private static void RemoveRunningTask(int id, ConcurrentDictionary<int, Task> runningTasks) 
    { 
     while (true) 
     { 
      Task outTask; 
      if (runningTasks.TryRemove(id, out outTask)) 
       break; 
      Task.Delay(25); 
     } 

    } 

這似乎工作,雖然我覺得它有點笨拙。我開始編碼「ContinueWith」這樣的,但意外的是,進入任務有不同的ID值(我希望它是相同的任務):

    var task = ProcessMessage(message, queue, token).ContinueWith(x => 
       { 
        while (true) 
        { 
         Task outTask; 
         if (runningTasks.TryRemove(x.Id, out outTask)) 
          break; 
         Task.Delay(25); 
        } 

       }); 

UPDATE: 原來,這仍然不起作用,我早些時候測試時會誤解結果。基於該MyFixIt例如,在工作的作用調用OnStop我有以下代碼:

 public override void OnStop() 
    { 
     Debug.WriteLine("OnStop_Begin"); 
     tokenSource.Cancel(); 
     tokenSource.Token.WaitHandle.WaitOne(); 
     base.OnStop(); 
     Debug.WriteLine("Onstop_End"); 
     tokenSource.Dispose(); 
    } 

看來,tokenSource.Token.WaitHandle.WaitOne是不是真的能等到這一切有一個任務對該令牌的引用已完成,因此即使任務仍在完成處理中,該角色也會繼續並停止。有什麼方法可以正確使用令牌來表示取消實際完成時的信號?

謝謝!

更新2

好吧,我想我有一個解決方案,現在工作。看起來CancellationToken.WaitHandle在調用.Cancel時發出信號,所以我不確定在調用.Cancel之後立即使用它的目的是什麼,它似乎總是會立即通過該代碼繼續執行? FixIt的例子就是這樣,但我並不是很瞭解它。爲了達到我的目的,我已經將ProcessMessagesAsync更改爲在ManualResetEventSlim中傳遞,然後在所有任務完成後設置它。然後在OnStop上等待,然後再完成Stop。

 /// <summary> 
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved. 
    /// </summary> 
    /// <param name="token"></param> 
    /// <returns></returns> 
    public virtual async Task ProcessMessagesAsync(CancellationToken token, ManualResetEventSlim reset) 
    { 
     CloudQueue queue = _queueClient.GetQueueReference(_queueName); 
     await queue.CreateIfNotExistsAsync(token); 

     var runningTasks = new ConcurrentDictionary<int, Task>(); 

     while (!token.IsCancellationRequested) 
     { 
      Debug.WriteLine("inLoop"); 
      // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages. 
      // Pass in a cancellation token, because the operation can be long-running. 
      CloudQueueMessage message = await queue.GetMessageAsync(token); 
      if (message != null) 
      { 
       var t = ProcessMessage(message, queue, token); 
       var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks)); 


       while (true) 
       { 
        if (runningTasks.TryAdd(t.Id, t)) 
         break; 
        await Task.Delay(25); 
       }          
      }      
      else 
      { 
       try 
       { 
        await Task.Delay(500, token); 
       } 
       catch (Exception ex) 
       { 
        Debug.WriteLine(ex.Message); 
       } 
      } 
     } 
     while (!runningTasks.IsEmpty) 
     { 
      Debug.WriteLine("Waiting for running tasks"); 
      await Task.Delay(500); 
     } 
     Debug.WriteLine("All tasks have finished, exiting ProcessMessagesAsync."); 
     reset.Set(); 
    } 
     public override void OnStop() 
    { 
     Debug.WriteLine("OnStop_Begin"); 
     tokenSource.Cancel(); 
     tokenSource.Token.WaitHandle.WaitOne(); 
     _reset.Wait(); 
     base.OnStop(); 
     Debug.WriteLine("Onstop_End"); 
     tokenSource.Dispose(); 
    } 
+0

你仍然有這個代碼的問題。您實際上並未使用您創建的任務列表。當令牌被取消時,您應該對所有正在運行的任務執行「等待」。理解問題的核心是while循環在等待創建的所有任務之前過早地完成。即使你的手動重置偶數方法並不總是在我看來。 –

+0

我的測試似乎表明它正在工作。我所做的是在任務開始時添加到列表中,然後在完成時刪除(使用ContinueWith)。因此任何時候runningTasks都有任何仍在運行的任務。因此,當取消發生時,它會跳出主要while循環,但會立即點擊第二個while循環,它將一直保留,直到runningTasks列表爲空(所有任務已完成)。然後,一旦它過去了,它將重置手動復位信號OnStop繼續。 –