1

下面是在本地SF集羣中運行並監聽服務總線隊列的無狀態服務偵聽器代碼。爲什麼連續調用ICommunicationListener.Abort()?

我想要的是不斷地在永遠在線的SF服務中監聽隊列命令。代碼改進提示是受歡迎的!

問題#1

中止()連續調用,有效地關閉我的連接。什麼是造成這種行爲,我該如何解決它?在我的理解中,Abort()應該只在未處理的異常或強制關閉的情況下被調用,這兩者我都沒有意識到。

獎金問題

說我們註釋掉中止()的調用CloseClient,它允許正確處理我們的隊列中。在第一條消息之後,CancellationToken的WaitHandle被標記爲已處置並將其傳遞到我的回調中將引發異常。這是什麼造成的?

感謝您的幫助!

using System; 
using System.Threading; 
using System.Threading.Tasks; 
using Microsoft.ServiceBus.Messaging; 
using Microsoft.ServiceFabric.Services.Communication.Runtime; 

namespace Common 
{ 
    public class QueueListener : ICommunicationListener 
    { 
     private readonly string _connectionString; 
     private readonly string _path; 
     private readonly Action<BrokeredMessage> _callback; 

     private QueueClient _client; 

     public QueueListener(string connectionString, string path, Action<BrokeredMessage> callback) 
     { 
      // Set field values 
      _connectionString = connectionString; 
      _path = path; 

      // Save callback action 
      _callback = callback; 
     } 

     public Task<string> OpenAsync(CancellationToken cancellationToken) 
     { 
      // Connect to subscription 
      _client = QueueClient.CreateFromConnectionString(_connectionString, _path); 

      // Configure the callback options 
      var options = new OnMessageOptions 
      { 
       AutoComplete = false 
      }; 

      // Catch and throw exceptions 
      options.ExceptionReceived += (sender, args) => throw args.Exception; 

      // Wire callback on message receipt 
      _client.OnMessageAsync(message => 
      { 
       return Task.Run(() => _callback(message), cancellationToken) 
        .ContinueWith(task => 
        { 
         if (task.Status == TaskStatus.RanToCompletion) 
          message.CompleteAsync(); 
         else 
          message.AbandonAsync(); 
        }, cancellationToken); 
      }, options); 

      return Task.FromResult(_client.Path); 
     } 

     public Task CloseAsync(CancellationToken cancellationToken) 
     { 
      CloseClient(); 
      return Task.FromResult(_client.Path); 
     } 

     public void Abort() 
     { 
      CloseClient(); 
     } 

     private void CloseClient() 
     { 
      // Make sure client is still open 
      if (_client == null || _client.IsClosed) 
       return; 

      // Close connection 
      _client.Close(); 
      _client = null; 
     } 
    } 
} 

回答

0

問題是創建一個ICommunicationListener與服務總線接口的整個概念;聽衆沒有聽任何東西!

將服務總線連接重構爲StatelessService實現中的默認RunAsync()方法可修復問題允許按預期監視取消令牌。

3

取消令牌傳遞給OpenAsync並不意圖的方法的範圍之外使用,就像它是現在。你將它傳遞給OnMessage。 OnMessage將在調用OpenAsync之後調用,但令牌在OpenAsyn完成後放置。

創建一個新的CancellationToken,或使用這個nuget包應該幫助here

+0

謝謝!非常有意義。關於主要問題的任何想法? – denious

+0

我懷疑OnMessage由於置位令牌而生成異常,這會導致事件ExceptionReceived觸發,這會導致未處理的異常,從而觸發Abort。但是你需要調試才能確定。 – LoekD

+0

這不是,但我知道了。再次感謝! – denious

相關問題