我有以下代碼MSMQ ReceiveById失敗
public class MsmqQueueProvider : IQueueProvider
{
public void VeryfyIsAvailable(string name)
{
var queueAddress = string.Format(@" .\private$\{0}", name);
var message = "There was a problem while starting the NEasyMessaging.";
if (MessageQueue.Exists(queueAddress))
{
using (var queue = new MessageQueue(queueAddress))
{
if (queue.CanWrite && queue.CanRead) return;
if (queue.CanRead == false)
{
message += string.Format("Queue {0} is reachable but not readable", queueAddress);
throw new QueueProviderProviderException(message);
}
message += string.Format("Queue {0} is reachable but not writable", queueAddress);
throw new QueueProviderProviderException(message);
}
}
message += string.Format("Queue {0} cannot be found", queueAddress);
throw new QueueProviderProviderException(message);
}
public QueueMessage Peek(string queueName)
{
var queue = new MessageQueue(string.Format(@" .\private$\{0}", queueName), QueueAccessMode.Peek);
var message = queue.Peek();
// ReSharper disable once PossibleNullReferenceException
return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
}
public QueueMessage Receive(string queueName)
{
var queue = new MessageQueue(string.Format(@" .\private$\{0}", queueName), QueueAccessMode.Receive);
var message = queue.Receive(MessageQueueTransactionType.Automatic);
// ReSharper disable once PossibleNullReferenceException
return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
}
public QueueMessage ReceiveById(string queueName, string messageId)
{
var queue = new MessageQueue(string.Format(@" .\private$\{0}", queueName), QueueAccessMode.Receive);
var message = queue.ReceiveById(messageId, MessageQueueTransactionType.Automatic);
// ReSharper disable once PossibleNullReferenceException
return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
}
public void QueueMessage(string messageContent, string messageName, string queueName)
{
var queueAddress = string.Format(@" .\private$\{0}", queueName);
using (var streamReader = new StringReader(messageContent))
{
var message = new Message
{
TimeToBeReceived = Message.InfiniteTimeout,
TimeToReachQueue = Message.InfiniteTimeout,
Label = messageName,
UseAuthentication = false,
Recoverable = true
};
using (var queue = new MessageQueue(queueAddress, QueueAccessMode.Send))
{
using (var streamWriter = new StreamWriter(message.BodyStream))
{
streamWriter.Write(streamReader.ReadToEnd());
streamWriter.Flush();
queue.Send(message, MessageQueueTransactionType.Automatic);
}
queue.Close();
}
}
}
}
public class UnitOfWork : IUnitOfWork
{
private TransactionScope _transaction;
public void Start()
{
var transactionOptions = new TransactionOptions
{
Timeout = TransactionManager.MaximumTimeout
};
_transaction = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions);
}
public void CompletedWithSuccess()
{
if (Transaction.Current.TransactionInformation.Status == TransactionStatus.Active)
{
_transaction.Complete();
}
_transaction.Dispose();
}
public void CompletedWithFail()
{
_transaction.Dispose();
}
}
public sealed partial class Service : ServiceBase
{
private readonly ILog _log = LogManager.GetLogger(typeof(Service));
private readonly ManualResetEvent _shutdownEvent = new ManualResetEvent(false);
private Thread _workerThread;
private IQueueProvider _queueProvider;
private IEndpointConfiguration _configuration;
private IContainer _container;
public Service()
{
InitializeComponent();
ServiceName = "";
EventLog.Log = "";
}
public void Init()
{
var endpointBootstrap = new EndpointBootstrap();
endpointBootstrap.Initialize();
_container = endpointBootstrap.IocContainer;
_queueProvider = _container.Resolve<IQueueProvider>();
_configuration = _container.Resolve<IEndpointConfiguration>();
_workerThread = new Thread(DoWork) { Name = "Worker Thread", IsBackground = true };
_workerThread.Start();
}
protected override void OnStart(string[] args)
{
Init();
}
protected override void OnStop()
{
_shutdownEvent.Set();
if (!_workerThread.Join(3000))
{
_workerThread.Abort();
}
}
private void DoWork()
{
while (!_shutdownEvent.WaitOne(0))
{
var queueMessage = _queueProvider.Peek(_configuration.QueueName);
try
{
ProcessMessage(queueMessage);
}
catch (Exception ex)
{
_log.Error(ex);
MoveMessageToErrorQueue(queueMessage.Id);
}
}
}
private void ProcessMessage(QueueMessage message)
{
using (var dependencyScope = _container.BeginLifetimeScope())
{
var unitOfWork = dependencyScope.Resolve<IUnitOfWork>();
unitOfWork.Start();
var messageProcessor = new MessageProcessor(dependencyScope);
try
{
messageProcessor.HandleMessage(message);
_queueProvider.ReceiveById(_configuration.QueueName, message.Id);
}
catch (Exception ex)
{
_log.Error(ex);
unitOfWork.CompletedWithFail();
throw;
}
unitOfWork.CompletedWithSuccess();
}
}
private void MoveMessageToErrorQueue(string messageId)
{
try
{
using (var dependencyScope = _container.BeginLifetimeScope())
{
var unitOfWork = dependencyScope.Resolve<IUnitOfWork>();
unitOfWork.Start();
var message = _queueProvider.ReceiveById(_configuration.QueueName, messageId);
try
{
_queueProvider.QueueMessage(message.Body, message.Name, _configuration.QueueErrorName);
unitOfWork.CompletedWithSuccess();
}
catch
{
unitOfWork.CompletedWithFail();
throw;
}
}
}
catch (Exception ex)
{
_log.Error(ex);
}
}
}
基本上我的想法很簡單,至少在紙面上。消息從隊列中取得很好,在開發機器上一切正常,問題是當我們的代碼部署到我們的服務器(Windows 2008)。如果不處理的消息正確,我們從隊列中刪除的消息,並把它放到一個錯誤隊列,問題是,該方法GetById無法找到消息:
private void MoveMessageToErrorQueue(string messageId)
var message = _queueProvider.ReceiveById(_configuration.QueueName, messageId);
它工作在dev的盒子很好,但我們只是可以找到一種方法來解決這個問題。
任何幫助,歡迎。
感謝
UPDATE
繼保的評論:
嗨保羅,感謝您的幫助。不幸的是,除非我明白錯誤,否則即使開始接收也不會。現在我從隊列中選擇一條消息,因爲如果稍後通過id接收消息,那麼只有一條線程從該隊列中讀取,這似乎是消息仍然存在的邏輯。那麼爲什麼我認爲我需要按照他們的方式來做事。我查看消息,然後創建一個事務範圍並執行任何處理,當然在執行過程中創建的任何SQL Server會話都會註冊該事務。如果在消息處理過程中出現問題,我需要回滾對數據庫所做的更改,然後回滾事務,但我還需要將失敗的消息放入錯誤隊列中。我不能只在一個事務中執行,從隊列中移除消息嘗試處理消息,如果失敗則將其放入錯誤隊列中,因爲我仍然需要回滾數據庫更改。
所以你在生產機器上得到的消息,但以後找不到它?或者它沒有到達? – Default
嗨默認。消息就是它必須存在的。如果在消息處理期間生成異常,則事務範圍將回滾並創建一個新事務以將該消息從一個隊列移至另一個隊列。消息的id是消息的id,就在處理之前達到峯值,所以消息必須在那裏,或者至少我是這樣,畢竟peek只是檢索消息的副本。 – Marco