我遇到了一個問題IMessageSessionAsyncHandlerFactory
IMessageSessionAsyncHandler
的新實例沒有創建時,寫入量變爲0,然後達到正常水平。Azure服務總線SessionHandler問題與分區隊列
更準確地說,我使用的SessionHandlerOptions
的值爲MaxConcurrentSessions
。這允許以超過1k msg/s的速度進行讀取。 我正在閱讀的隊列是分區隊列。 隊列中消息的數量是相當穩定的,但不時會下降到0.當數據恢復到正常水平時,SessionFactory不會產生任何處理程序,所以我無法再讀取消息。這就像會議沒有正確回收或持續進行等待。
下面是工廠登記代碼:
private void RegisterHandler()
{
var sessionHandlerOptions = new SessionHandlerOptions
{
AutoRenewTimeout = TimeSpan.FromMinutes(1),
MessageWaitTimeout = TimeSpan.FromSeconds(1),
MaxConcurrentSessions = 500
};
_queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(_callback), sessionHandlerOptions);
}
工廠類:
public class SessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
private readonly Action<BrokeredMessage> _callback;
public SessionHandlerFactory(Action<BrokeredMessage> callback)
{
_callback = callback;
}
public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
{
return new SessionHandler(session.SessionId, _callback);
}
public void DisposeInstance(IMessageSessionAsyncHandler handler)
{
var disposable = handler as IDisposable;
disposable?.Dispose();
}
}
而且處理程序:
public class SessionHandler : MessageSessionAsyncHandler
{
private readonly Action<BrokeredMessage> _callback;
public SessionHandler(string sessionId, Action<BrokeredMessage> callback)
{
SessionId = sessionId;
_callback = callback;
}
public string SessionId { get; }
protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
{
try
{
_callback(message);
}
catch (Exception ex)
{
Logger.Error(...);
}
}
我可以看到會話處理程序關閉,並且在寫/讀處於正常水平時處理工廠。但是,一旦隊列清空,就無法創建新的會話處理程序。是否有一個策略來分配會話ID,禁止在一段時間不活動後重新分配相同的會話?
當作家停止並重新啓動,運行的讀者是不能夠儘可能之前閱讀。
謝謝布魯斯,我在問題中添加了兩個圖像來說明行爲。 –