2017-02-20 81 views
0

我遇到了一個問題IMessageSessionAsyncHandlerFactoryIMessageSessionAsyncHandler的新實例沒有創建時,寫入量變爲0,然後達到正常水平。Azure服務總線SessionHandler問題與分區隊列

更準確地說,我使用的SessionHandlerOptions的值爲MaxConcurrentSessions。這允許以超過1k msg/s的速度進行讀取。 我正在閱讀的隊列是分區隊列。 隊列中消息的數量是相當穩定的,但不時會下降到0.當數據恢復到正常水平時,SessionFactory不會產生任何處理程序,所以我無法再讀取消息。這就像會議沒有正確回收或持續進行等待。

下面是工廠登記代碼:

private void RegisterHandler() 
    { 
     var sessionHandlerOptions = new SessionHandlerOptions 
     { 
      AutoRenewTimeout = TimeSpan.FromMinutes(1), 
      MessageWaitTimeout = TimeSpan.FromSeconds(1), 
      MaxConcurrentSessions = 500 
     }; 
     _queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(_callback), sessionHandlerOptions); 
    } 

工廠類:

public class SessionHandlerFactory : IMessageSessionAsyncHandlerFactory 
{ 
    private readonly Action<BrokeredMessage> _callback; 

    public SessionHandlerFactory(Action<BrokeredMessage> callback) 
    { 
     _callback = callback; 
    } 

    public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message) 
    { 
     return new SessionHandler(session.SessionId, _callback); 
    } 

    public void DisposeInstance(IMessageSessionAsyncHandler handler) 
    { 
     var disposable = handler as IDisposable; 

     disposable?.Dispose(); 
    } 
} 

而且處理程序:

public class SessionHandler : MessageSessionAsyncHandler 
{ 
    private readonly Action<BrokeredMessage> _callback; 

    public SessionHandler(string sessionId, Action<BrokeredMessage> callback) 
    { 
     SessionId = sessionId; 
     _callback = callback; 
    } 

    public string SessionId { get; } 

    protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message) 
    { 
     try 
     { 
      _callback(message); 
     } 
     catch (Exception ex) 
     { 
      Logger.Error(...); 
     } 
    } 

我可以看到會話處理程序關閉,並且在寫/讀處於正常水平時處理工廠。但是,一旦隊列清空,就無法創建新的會話處理程序。是否有一個策略來分配會話ID,禁止在一段時間不活動後重新分配相同的會話?

編輯1: 我加入兩張圖片來說明問題: enter image description here

當作家停止並重新啓動,運行的讀者是不能夠儘可能之前閱讀。

那一刻之後創建的會話的數量也比以前低得多: enter image description here

回答

0

消息隊列中的量是相當恆定的,但不時它會下降到0。音量恢復到正常水平,SessionFactory不產生任何處理程序,所以我不能再讀取消息。這就像會議沒有正確回收或持續進行等待。

當使用IMessageSessionHandlerFactory來控制如何創建IMessageSessionAsyncHandler情況下,你可以嘗試登錄您的所有IMessageSessionAsyncHandler實例的創建和銷燬。

根據你的代碼,我在這個問題上創建了一個控制檯應用程序。這裏是我的初始化隊列客戶端和處理消息的代碼片段:

InitializeReceiver

static void InitializeReceiver(string connectionString, string queuePath) 
{ 
    _queueClient = QueueClient.CreateFromConnectionString(connectionString, queuePath, ReceiveMode.PeekLock); 

    var sessionHandlerOptions = new SessionHandlerOptions 
    { 
     AutoRenewTimeout = TimeSpan.FromMinutes(1), 
     MessageWaitTimeout = TimeSpan.FromSeconds(5), 
     MaxConcurrentSessions = 500 
    }; 
    _queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(OnMessageHandler), sessionHandlerOptions); 
} 

OnMessageHandler

static void OnMessageHandler(BrokeredMessage message) 
{ 
    var body = message.GetBody<Stream>(); 

    dynamic recipeStep = JsonConvert.DeserializeObject(new StreamReader(body, true).ReadToEnd()); 
    lock (Console.Out) 
    { 
     Console.ForegroundColor = ConsoleColor.Cyan; 
     Console.WriteLine(
      "Message received: \n\tSessionId = {0}, \n\tMessageId = {1}, \n\tSequenceNumber = {2}," + 
      "\n\tContent: [ title = {3} ]", 
      message.SessionId, 
      message.MessageId, 
      message.SequenceNumber, 
      recipeStep.title); 
     Console.ResetColor(); 
    } 
    Task.Delay(TimeSpan.FromSeconds(3)).Wait(); 
    message.Complete(); 
} 

按我的測試,如預期的那樣SessionHandler可以工作時的音量的隊列中的消息從正常到零以及從零到正常一段時間,如下所示:

我也試圖利用QueueClient.RegisterSessionHandlerAsync來測試這個問題,它的工作原理也是如此。另外,我發現這個關於Service Bus Sessions的git示例,你可以參考它。

+0

謝謝布魯斯,我在問題中添加了兩個圖像來說明行爲。 –

相關問題