2014-07-07 37 views
3

我爲標題提前道歉,但它是描述行爲的最好的想法。微軟TPL數據流 - 同步處理關聯請求

要求是處理消息總線的請求。 請求可能與相關或分組這些請求的id相關。 我想要的行爲是請求流同步處理關聯ID。 但是,不同的ID可以異步處理。

我正在使用concurrentdictionary來跟蹤正在處理的請求和linkto中的謂詞。

這是假設提供相關請求的同步處理。

但是,我得到的行爲是第一個請求得到處理,第二個請求被丟棄。

我附上了來自控制檯應用程序的示例代碼來模擬問題。

任何方向或反饋將不勝感激。

using System; 
using System.Collections.Concurrent; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace ConsoleApplication2 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var requestTracker = new ConcurrentDictionary<string, string>(); 

      var bufferBlock = new BufferBlock<Request>(); 

      var actionBlock = new ActionBlock<Request>(x => 
      { 
       Console.WriteLine("processing item {0}",x.Name); 
       Thread.Sleep(5000); 
       string itemOut = null; 
       requestTracker.TryRemove(x.Id, out itemOut); 
      }); 

      bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name)); 


      var publisher = Task.Run(() => 
      { 
       var request = new Request("item_1", "first item"); 
       bufferBlock.SendAsync(request); 

       var request_1 = new Request("item_1", "second item"); 
       bufferBlock.SendAsync(request_1); 

      }); 

      publisher.Wait(); 
      Console.ReadLine(); 
     } 
    } 

    public class Request 
    { 
     public Request(string id, string name) 
     { 
      this.Id = id; 
      this.Name = name; 
     } 
     public string Id { get; set; } 
     public string Name { get; set; } 
    } 
} 
+0

您應該讓您的異常通過數據流的管道傳播,以便您可以查看出了什麼問題。在[MSDN演練](http://msdn.microsoft.com/en-us/library/hh228604(v = vs.110).aspx)末尾查看MS的完整示例。然後你可以處理AggregateException來找出錯誤。 – JNYRanger

+2

你的意思是你想讓一個具有相同ID的組被一個接一個地處理,而組可以被同時處理?如果是這樣就有你的答案:http://stackoverflow.com/q/21010024/885318 – i3arnon

+0

@ I3arnon - 你的解決方案似乎是我正在尋找。我在這裏有點懶,但也許你有更多的細節。我猜你所得到的密鑰是動態的,所以基本上,突發的消息將具有相同的密鑰並且保持不斷變化。當一個動作塊忙於委託來處理消息時,它會更新一個字典,說我忙於處理這個請求,並且任何與該關鍵字匹配的後續請求都被委託給該動作塊?我對麼? – rizan

回答

1

我相信這是因爲您的LinkTo()設置不正確。通過有一個LinkTo()並傳遞一個函數作爲參數,您正在添加一個條件。所以這條線:

bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id, x.Name)); 

基本上是說,從bufferBlock傳遞數據給actionBlock如果你能添加到您的併發字典,它並不一定意義(至少在你的示例代碼)

相反,你應該將你的bufferBlock鏈接到沒有lambda的actionblock,因爲在這種情況下你不需要條件鏈接(至少我不這麼認爲是基於你的示例代碼)。

此外,看一看這太問題,看看你是否應該使用SendAsync()Post()因爲Post()可以更容易來處理簡單地增加數據進入管道:TPL Dataflow, whats the functional difference between Post() and SendAsync()?。 SendAsync將返回一個任務,而Post將基於成功進入管道返回true/false。

因此,要基本上找出出了什麼問題,你需要處理你的塊的延續。有在這裏是一個很好的教程超過在MSDN中的TPL數據流介紹:Create a DataFlow Pipeline它將基本上是這樣的:

//link to section 
bufferBlock.LinkTo(actionBlock); 
//continuations 
bufferBlock.Completion.ContinueWith(t => 
{ 
    if(t.IsFaulted) ((IDataFlowBlock).actionBlock).Fault(t.Exception); //send the exception down the pipeline 
    else actionBlock.Complete(); //tell the next block that we're done with the bufferblock 
}); 

然後,您可以捕獲該異常(AggregateException)等待管道時。你是否真的需要在實際代碼中使用concurrentdictionary來進行跟蹤,因爲這可能會導致無法添加時的問題,因爲當linkto謂詞返回false時,它不會將數據傳遞到管道的下一個塊。

+0

謝謝JNYRanger,我會試試這個。 TryAdd的TryAdd的目的是爲了線程安全。另一個是併發字典爲讀取進行了優化。在我的情況2關聯ID通過檢查,並試圖添加。因此,我選擇了TryAdd,在這種情況下,一方會贏得比賽的勝利,另一方會被擱置。 – rizan

+1

@rizan有意義,但爲了讓所有事情都能通過,你需要第二個'LinkTo()'來處理添加到ConcurrentDictionary中的失敗,否則你的管道將會中斷,這就是你正在經歷的。 – JNYRanger

+0

您的方法和您提供的樣品在可以等待完成的設置中表現出色。我所做的就是用I3arnon的方法進行一些微調。我使用TDD先寫測試,然後覆蓋所有基地。十分感謝你的幫助。 – rizan

2
  1. 你說你要並行處理的一些請求(至少我認爲這就是你所說的「異步」的意思),但ActionBlock默認情況下不平行。要更改該設置,請設置MaxDegreeOfParallelism

  2. 您正在嘗試使用TryAdd()作爲過濾器,但不會原因有二:

    1. 過濾器被調用一次,它不會自動重試或類似的東西。這意味着如果一件物品沒有通過,它將永遠不會通過,即使在阻塞它的物品完成之後。
    2. 如果某個項目卡在某個塊的輸出隊列中,則沒有其他項目會從該塊中移出。這可能會大大降低並行性水平,即使您以某種方式解決了上一個問題。
  3. 我認爲這裏最簡單的解決方案是爲每個組設置一個塊,這樣每個組的項目將按順序處理,但是來自不同組的項目將被並行處理。在代碼中,它可能看起來像這樣:

    var processingBlocks = new Dictionary<string, ActionBlock<Request>>(); 
    
    var splitterBlock = new ActionBlock<Request>(request => 
    { 
        ActionBlock<Request> processingBlock; 
    
        if (!processingBlocks.TryGetValue(request.Id, out processingBlock)) 
        { 
         processingBlock = processingBlocks[request.Id] = 
          new ActionBlock<Request>(r => /* process the request here */); 
        } 
    
        processingBlock.Post(request); 
    }); 
    

    這種方法的問題是組的處理塊永遠不會消失。如果你負擔不起(這是一個內存泄漏),因爲你將有大量的組,那麼hashing approach suggested by I3arnon是要走的路。

+0

感謝您的幫助svick - 其實我最終使用了I3arnon方法。對於我嘗試做的事情來說,更強大,更不容易出錯。 – rizan