2013-10-26 19 views
-1

我們有一個應用程序定期接收多媒體消息,並應回覆它們。同時通過多個線程處理數據

我們目前使用單線程,首先接收消息,然後逐個處理它們。這是工作,但速度很慢。

所以我們現在正在考慮同時進行多個線程的同一個過程。

任何簡單的方法來允許並行處理傳入的記錄,但避免錯誤地處理兩個線程相同的記錄?

+1

簡潔大方 - >將引發討論... – igrimpe

+0

你如何接收記錄?如果使用TCP/IP,您可以有一個監聽線程,這將爲每個傳入記錄創建一個線程。 –

+0

很難給出具體的答案。您可以從.NET中的Parallel.For方法到具有多個可伸縮輔助角色的Azure服務總線。如果你可以指定更多的信息,甚至一些示例代碼,我們應該能夠提供更具體的建議。 –

回答

1

任何簡單的方法來允許並行處理傳入記錄,但避免錯誤地處理兩個線程相同的記錄?

是的,它實際上是不是太硬,你想要做什麼是所謂的「生產者 - 消費者模式」

如果你的消息接收器只能一次處理一個線程,但你的信息「處理器「可以一次你只需要使用一個BlockingCollection來存儲需要處理

public sealed class MessageProcessor : IDisposable 
{ 
    public MessageProcessor() 
     : this(-1) 
    { 
    } 

    public MessageProcessor(int maxThreadsForProcessing) 
    { 
     _maxThreadsForProcessing = maxThreadsForProcessing; 
     _messages = new BlockingCollection<Message>(); 
     _cts = new CancellationTokenSource(); 

     _messageProcessorThread = new Thread(ProcessMessages); 
     _messageProcessorThread.IsBackground = true; 
     _messageProcessorThread.Name = "Message Processor Thread"; 
     _messageProcessorThread.Start(); 
    } 

    public int MaxThreadsForProcessing 
    { 
     get { return _maxThreadsForProcessing; } 
    } 

    private readonly BlockingCollection<Message> _messages; 
    private readonly CancellationTokenSource _cts; 
    private readonly Thread _messageProcessorThread; 
    private bool _disposed = false; 
    private readonly int _maxThreadsForProcessing; 


    /// <summary> 
    /// Add a new message to be queued up and processed in the background. 
    /// </summary> 
    public void ReceiveMessage(Message message) 
    { 
     _messages.Add(message); 
    } 

    /// <summary> 
    /// Signals the system to stop processing messages. 
    /// </summary> 
    /// <param name="finishQueue">Should the queue of messages waiting to be processed be allowed to finish</param> 
    public void Stop(bool finishQueue) 
    { 
     _messages.CompleteAdding(); 
     if(!finishQueue) 
      _cts.Cancel(); 

     //Wait for the message processor thread to finish it's work. 
     _messageProcessorThread.Join(); 
    } 

    /// <summary> 
    /// The background thread that processes messages in the system 
    /// </summary> 
    private void ProcessMessages() 
    { 
     try 
     { 
      Parallel.ForEach(_messages.GetConsumingEnumerable(), 
         new ParallelOptions() 
         { 
          CancellationToken = _cts.Token, 
          MaxDegreeOfParallelism = MaxThreadsForProcessing 
         }, 
         ProcessMessage); 
     } 
     catch (OperationCanceledException) 
     { 
      //Don't care that it happened, just don't want it to bubble up as a unhandeled exception. 
     } 
    } 

    private void ProcessMessage(Message message, ParallelLoopState loopState) 
    { 
     //Here be dragons! (or your code to process a message, your choice :-)) 

     //Use if(_cts.Token.IsCancellationRequested || loopState.ShouldExitCurrentIteration) to test if 
     // we should quit out of the function early for a graceful shutdown. 
    } 

    public void Dispose() 
    { 
     if(!_disposed) 
     { 
      if(_cts != null && _messages != null && _messageProcessorThread != null) 
       Stop(true); //This line will block till all queued messages have been processed, if you want it to be quicker you need to call `Stop(false)` before you dispose the object. 

      if(_cts != null) 
       _cts.Dispose(); 

      if(_messages != null) 
       _messages.Dispose(); 

      GC.SuppressFinalize(this); 
      _disposed = true; 
     } 
    } 

    ~MessageProcessor() 
    { 
     //Nothing to do, just making FXCop happy. 
    } 

} 

我強烈建議你閱讀免費書籍Patterns for Parallel Programming工作在多個消息的工作,它會在約一些這方面的細節。整個章節詳細解釋了生產者 - 消費者模型。


UPDATE:有一些性能問題GetConsumingEnumerable()Parallel.ForEach(,而是使用庫ParallelExtensionsExtras和它的新的擴展方法GetConsumingPartitioner()

public static Partitioner<T> GetConsumingPartitioner<T>(
    this BlockingCollection<T> collection) 
{ 
    return new BlockingCollectionPartitioner<T>(collection); 
} 

private class BlockingCollectionPartitioner<T> : Partitioner<T> 
{ 
    private BlockingCollection<T> _collection; 

    internal BlockingCollectionPartitioner(
     BlockingCollection<T> collection) 
    { 
     if (collection == null) 
      throw new ArgumentNullException("collection"); 
     _collection = collection; 
    } 

    public override bool SupportsDynamicPartitions { 
     get { return true; } 
    } 

    public override IList<IEnumerator<T>> GetPartitions(
     int partitionCount) 
    { 
     if (partitionCount < 1) 
      throw new ArgumentOutOfRangeException("partitionCount"); 
     var dynamicPartitioner = GetDynamicPartitions(); 
     return Enumerable.Range(0, partitionCount).Select(_ => 
      dynamicPartitioner.GetEnumerator()).ToArray(); 
    } 

    public override IEnumerable<T> GetDynamicPartitions() 
    { 
     return _collection.GetConsumingEnumerable(); 
    } 
}