-1
我們有一個應用程序定期接收多媒體消息,並應回覆它們。同時通過多個線程處理數據
我們目前使用單線程,首先接收消息,然後逐個處理它們。這是工作,但速度很慢。
所以我們現在正在考慮同時進行多個線程的同一個過程。
任何簡單的方法來允許並行處理傳入的記錄,但避免錯誤地處理兩個線程相同的記錄?
我們有一個應用程序定期接收多媒體消息,並應回覆它們。同時通過多個線程處理數據
我們目前使用單線程,首先接收消息,然後逐個處理它們。這是工作,但速度很慢。
所以我們現在正在考慮同時進行多個線程的同一個過程。
任何簡單的方法來允許並行處理傳入的記錄,但避免錯誤地處理兩個線程相同的記錄?
任何簡單的方法來允許並行處理傳入記錄,但避免錯誤地處理兩個線程相同的記錄?
是的,它實際上是不是太硬,你想要做什麼是所謂的「生產者 - 消費者模式」
如果你的消息接收器只能一次處理一個線程,但你的信息「處理器「可以一次你只需要使用一個BlockingCollection來存儲需要處理
public sealed class MessageProcessor : IDisposable
{
public MessageProcessor()
: this(-1)
{
}
public MessageProcessor(int maxThreadsForProcessing)
{
_maxThreadsForProcessing = maxThreadsForProcessing;
_messages = new BlockingCollection<Message>();
_cts = new CancellationTokenSource();
_messageProcessorThread = new Thread(ProcessMessages);
_messageProcessorThread.IsBackground = true;
_messageProcessorThread.Name = "Message Processor Thread";
_messageProcessorThread.Start();
}
public int MaxThreadsForProcessing
{
get { return _maxThreadsForProcessing; }
}
private readonly BlockingCollection<Message> _messages;
private readonly CancellationTokenSource _cts;
private readonly Thread _messageProcessorThread;
private bool _disposed = false;
private readonly int _maxThreadsForProcessing;
/// <summary>
/// Add a new message to be queued up and processed in the background.
/// </summary>
public void ReceiveMessage(Message message)
{
_messages.Add(message);
}
/// <summary>
/// Signals the system to stop processing messages.
/// </summary>
/// <param name="finishQueue">Should the queue of messages waiting to be processed be allowed to finish</param>
public void Stop(bool finishQueue)
{
_messages.CompleteAdding();
if(!finishQueue)
_cts.Cancel();
//Wait for the message processor thread to finish it's work.
_messageProcessorThread.Join();
}
/// <summary>
/// The background thread that processes messages in the system
/// </summary>
private void ProcessMessages()
{
try
{
Parallel.ForEach(_messages.GetConsumingEnumerable(),
new ParallelOptions()
{
CancellationToken = _cts.Token,
MaxDegreeOfParallelism = MaxThreadsForProcessing
},
ProcessMessage);
}
catch (OperationCanceledException)
{
//Don't care that it happened, just don't want it to bubble up as a unhandeled exception.
}
}
private void ProcessMessage(Message message, ParallelLoopState loopState)
{
//Here be dragons! (or your code to process a message, your choice :-))
//Use if(_cts.Token.IsCancellationRequested || loopState.ShouldExitCurrentIteration) to test if
// we should quit out of the function early for a graceful shutdown.
}
public void Dispose()
{
if(!_disposed)
{
if(_cts != null && _messages != null && _messageProcessorThread != null)
Stop(true); //This line will block till all queued messages have been processed, if you want it to be quicker you need to call `Stop(false)` before you dispose the object.
if(_cts != null)
_cts.Dispose();
if(_messages != null)
_messages.Dispose();
GC.SuppressFinalize(this);
_disposed = true;
}
}
~MessageProcessor()
{
//Nothing to do, just making FXCop happy.
}
}
我強烈建議你閱讀免費書籍Patterns for Parallel Programming工作在多個消息的工作,它會在約一些這方面的細節。整個章節詳細解釋了生產者 - 消費者模型。
UPDATE:有一些性能問題GetConsumingEnumerable()
和Parallel.ForEach(
,而是使用庫ParallelExtensionsExtras
和它的新的擴展方法GetConsumingPartitioner()
public static Partitioner<T> GetConsumingPartitioner<T>(
this BlockingCollection<T> collection)
{
return new BlockingCollectionPartitioner<T>(collection);
}
private class BlockingCollectionPartitioner<T> : Partitioner<T>
{
private BlockingCollection<T> _collection;
internal BlockingCollectionPartitioner(
BlockingCollection<T> collection)
{
if (collection == null)
throw new ArgumentNullException("collection");
_collection = collection;
}
public override bool SupportsDynamicPartitions {
get { return true; }
}
public override IList<IEnumerator<T>> GetPartitions(
int partitionCount)
{
if (partitionCount < 1)
throw new ArgumentOutOfRangeException("partitionCount");
var dynamicPartitioner = GetDynamicPartitions();
return Enumerable.Range(0, partitionCount).Select(_ =>
dynamicPartitioner.GetEnumerator()).ToArray();
}
public override IEnumerable<T> GetDynamicPartitions()
{
return _collection.GetConsumingEnumerable();
}
}
簡潔大方 - >將引發討論... – igrimpe
你如何接收記錄?如果使用TCP/IP,您可以有一個監聽線程,這將爲每個傳入記錄創建一個線程。 –
很難給出具體的答案。您可以從.NET中的Parallel.For方法到具有多個可伸縮輔助角色的Azure服務總線。如果你可以指定更多的信息,甚至一些示例代碼,我們應該能夠提供更具體的建議。 –