我想實現下面的用例。我有一個將監視Azure存儲隊列的Azure工作者角色,並且當消息進入時,這會觸發一個作業異步運行。如果可能,我想使用TPL,並且需要支持取消的操作,以便在Azure角色OnStop觸發時,可以儘可能優雅地退出作業。 Scott Guthrie發佈的MyFixIt示例幾乎正是我需要的,我已將其用作我的項目的模板。不支持的一個關鍵方面是需要異步運行作業。在FixIt代碼中,一旦作業啓動,將不會處理其他作業,直到完成作業爲止。我的應用程序將處理的一些作業長時間運行,並且我需要工作人員角色能夠注意到其他傳入作業,並在長時間運行的作業正在運行時運行這些作業。Azure Worker角色異步處理作業
這裏的兩個關鍵方法是監視隊列的ProcessMessagesAsync和ProcessMessage,它將在消息進入時運行作業。下面是我所擁有的,它主要工作,除非它不能正確處理CancellationRequest,並且Azure工作者角色將關閉而無需等待作業完成。
/// <summary>
/// Continuous loop that monitors the queue and launches jobs when they are retrieved.
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public virtual async Task ProcessMessagesAsync(CancellationToken token)
{
CloudQueue queue = _queueClient.GetQueueReference(_queueName);
await queue.CreateIfNotExistsAsync(token);
while (!token.IsCancellationRequested)
{
Debug.WriteLine("inLoop");
// The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
// Pass in a cancellation token, because the operation can be long-running.
CloudQueueMessage message = await queue.GetMessageAsync(token);
if (message != null)
{
ProcessMessage(message, queue, token);
}
else
{
await Task.Delay(500, token);
}
}
}
protected virtual async Task ProcessMessage(CloudQueueMessage message, CloudQueue queue, CancellationToken token)
{
var jobDetails = JobDetails.DeserializeJson(message.AsString);
var result = await _jobRunner.RunJob(jobDetails, token);
//todo handle error
//if (result.Status == JobStatus.Error)
await queue.DeleteMessageAsync(message);
}
然後JobRunner運行請求的作業。我寫了一個TestJob,其中我試圖模擬一個可以注意到CancellationRequest的長時間運行的作業,並且在一段短的清理週期後,儘早退出該作業。
public virtual async Task<JobResult> RunJob(JobDetails jobDetails, CancellationToken token)
{
switch (jobDetails.JobName.ToLower())
{
case "testjob":
return await TestJob(jobDetails.Args, token);
}
return new JobResult(JobStatus.Error) { ErrorMessage = "The job requested does not exist." };
}
protected virtual async Task<JobResult> TestJob(List<string> jobArgs, CancellationToken token)
{
var message = "no message";
if (jobArgs != null && jobArgs.Any())
message = jobArgs[0];
return await Task.Run(async() =>
{
Debug.WriteLine(string.Format("Start:{0}", message));
for (int i = 1; i <= 800; i++)
{
if (token.IsCancellationRequested)
{
Debug.WriteLine("CancelationRequest in TestJob");
//simulate short time to cleanup and exit early
Thread.Sleep(1500);
Debug.WriteLine("Cancelation Job Cleanup finsihed.");
token.ThrowIfCancellationRequested();
}
Thread.Sleep(10);
}
Debug.WriteLine(string.Format("Finish:{0}", message));
return new JobResult(JobStatus.Success);
});
}
我一直在尋找,現在研究2天,包括TPL數據流庫,而且還沒有能夠拿出一個辦法,使這項工作正常。我覺得ProcessMessage(消息,隊列,令牌)調用沒有正確完成,甚至有一個編譯器警告'因爲這個調用沒有等待......'。但是我不想等待(這是FixIt的例子),因爲在運行完成之前,沒有其他工作會被發現。這似乎不是一個不常見的用例,儘管我似乎無法找到任何描述它的人。
非常感謝您的幫助!
丹尼·格林
你真的應該看看WebJobs;他們爲你處理所有這些。 –