2013-08-23 39 views
2

我正在使用Azure服務總線SubscriptionClient.OnMessage方法;配置爲同時處理最多5條消息。等待由Service Bus OnMessage處理的消息完成

在代碼中,我需要等待所有消息完成處理,然後才能繼續(正確關閉Azure工作者角色)。我該怎麼做呢?

SubscriptionClient.Close()會阻塞,直到所有消息都已完成處理?

回答

6

在SubscriptionClient或QueueClient上調用Close將不會阻塞。據我所知,調用關閉會立即關閉實體。我使用Windows Azure SDK 2.0隨附的Service Bus With Queue項目模板快速測試。我在消息處理操作中添加了線程睡眠許多秒,然後在運行時關閉該角色。當消息在線程睡眠中處理時,我看到Close方法被調用,但它肯定沒有等待消息處理完成,角色簡單地關閉了。

爲了妥善處理這個問題,您需要做與處理消息的任何工作者角色(服務總線,Azure存儲隊列或其他任何東西)時所做的相同的事情:跟蹤正在處理的內容以及完成後關閉。有幾種方法可以解決這個問題,但是由於涉及多個線程,所有這些方法都是手動的,並且在這種情況下變得混亂。

鑑於OnMessage的工作方式,您需要在操作中添加一些內容,以查看角色是否被告知關閉,如果是,則不執行任何處理。問題是,當執行OnMessage動作時,它已經有一條消息。您可能需要放棄該消息,但不會退出OnMessage操作,否則在隊列中會有消息時會繼續收到消息。你不能簡單地放棄這條消息並讓執行離開這個動作,因爲這樣系統會傳遞另一條消息(可能是同一條消息),並且這樣做的幾個線程可能會導致消息得到太多的出列計數並且死信。另外,您不能在SubscriptionClient或QueueClient上調用Close,因爲它會在內部停止接收循環,因爲一旦調用close,任何未完成的消息處理都會在調用.Complete,.Abandon等時拋出異常。消息,因爲消息實體現在已關閉。這意味着您無法輕鬆停止收到的消息。

這裏的主要問題是因爲您正在使用OnMessage並通過在OnMessageOptions上設置MaxConcurrentCalls來設置併發消息處理,這意味着啓動和管理線程的代碼會被埋在QueueClient和SubscriptionClient中,並且您不會對此無法控制。您沒有辦法減少線程的數量,或者單獨停止線程等。您需要創建一種方法來將OnMessage操作線程置於一種狀態,在這種狀態下他們知道系統被告知關閉然後完成他們的消息並且不退出該動作以使他們不被連續地分配新的消息。這意味着您可能還需要將MessageOptions設置爲不使用自動完成,並在OnMessage操作中手動調用complete。

必須這樣做可能會嚴重降低使用OnMessage助手的實際好處。在幕後,OnMessage只是簡單地設置一個循環,調用receive的默認超時並將消息傳遞給另一個線程來執行動作(鬆散描述)。因此,通過使用OnMessage方法得到的內容遠遠不必自己編寫該處理程序,但是您遇到的問題是因爲您沒有自己編寫該處理程序而無法控制這些線程。第二十二條軍規。如果你真的需要優雅地停下來,你可能想要離開OnMessage的方法,用線程編寫你自己的接收循環,並在主循環中停止接收新消息,並等待所有工作人員結束。

一個選項,特別是如果消息是冪等的(意味着多次處理它們會得到相同的結果......您應該注意的是),那麼如果它們在中途處理中停止,它們將重新出現在隊列稍後將由另一個實例處理。如果工作本身不是資源密集型的,並且操作是冪等的,那麼這真的可以成爲一種選擇。與由於硬件故障或其他問題而導致實例失敗相同。當然,這不是優雅或優雅,但它肯定會消除我所提到的所有複雜性,而且由於其他故障仍然可能發生。

請注意,當一個實例被告知關閉時,OnStop被調用。你有5分鐘的時間可以延遲這個時間,直到結構關閉它,所以如果你的信息花費超過五分鐘的時間來處理它,如果你試圖優雅地關閉它並不重要,有些將被切斷在處理期間。

+1

感謝您的詳細解答。我會採納你的建議,並自己寫下循環。我希望OnMessage方法能夠成爲多線程的一個很好的捷徑,但它看起來會比它的價值更麻煩。 –

+0

我已經向產品組發送了關於此的反饋,並且我也已將此意見發佈到Windows Azure用戶語音中。 http://www.mygreatwindowsazureidea.com/forums/216926-service-bus/suggestions/4345733-provide-gracefull-shutdown-feature-to-message-pump如果您想看到它,請將其投票。 – MikeWo

2

您可以調整OnMessageAsync從開始等待消息的處理完成,並阻止新的消息進行處理:

下面是執行:

_subscriptionClient.OnMessageAsync(async message => 
     { 
      if (_stopRequested) 
      { 
       // Block processing of new messages. We want to wait for old messages to complete and exit. 
       await Task.Delay(_waitForExecutionCompletionTimeout);     
      } 
      else 
      { 
       try 
       { 
        // Track executing messages 
        _activeTaskCollection[message.MessageId] = message; 

        await messageHandler(message); 
        await message.CompleteAsync(); 
       } 
       catch (Exception e) 
       { 
        // handle error by disposing or doing nothing to force a retry 
       } 
       finally 
       { 
        BrokeredMessage savedMessage; 

        if (!_activeTaskCollection.TryRemove(message.MessageId, out savedMessage)) 
        { 
         _logger.LogWarning("Attempt to remove message id {0} failed.", savedMessage.MessageId); 
        } 
       } 
      } 

     }, onMessageOptions); 

這等待停止的實現完成:

public async Task Stop() 
    { 
     _stopRequested = true; 
     DateTime startWaitTime = DateTime.UtcNow; 

     while (DateTime.UtcNow - startWaitTime < _waitForExecutionCompletionTimeout && _activeTaskCollection.Count > 0) 
     { 
      await Task.Delay(_waitForExecutionCompletionSleepBetweenIterations); 
     } 

     await _subscriptionClient.CloseAsync(); 
     } 

注意_activeTaskCollection是ConcurrentDictionary(我們也可以使用一個計數器,互鎖到c ount正在進行的消息數量,但使用字典可讓您調查發生錯誤時容易發生的事情。