我爲標題提前道歉,但它是描述行爲的最好的想法。微軟TPL數據流 - 同步處理關聯請求
要求是處理消息總線的請求。 請求可能與相關或分組這些請求的id相關。 我想要的行爲是請求流同步處理關聯ID。 但是,不同的ID可以異步處理。
我正在使用concurrentdictionary來跟蹤正在處理的請求和linkto中的謂詞。
這是假設提供相關請求的同步處理。
但是,我得到的行爲是第一個請求得到處理,第二個請求被丟棄。
我附上了來自控制檯應用程序的示例代碼來模擬問題。
任何方向或反饋將不勝感激。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApplication2
{
class Program
{
static void Main(string[] args)
{
var requestTracker = new ConcurrentDictionary<string, string>();
var bufferBlock = new BufferBlock<Request>();
var actionBlock = new ActionBlock<Request>(x =>
{
Console.WriteLine("processing item {0}",x.Name);
Thread.Sleep(5000);
string itemOut = null;
requestTracker.TryRemove(x.Id, out itemOut);
});
bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name));
var publisher = Task.Run(() =>
{
var request = new Request("item_1", "first item");
bufferBlock.SendAsync(request);
var request_1 = new Request("item_1", "second item");
bufferBlock.SendAsync(request_1);
});
publisher.Wait();
Console.ReadLine();
}
}
public class Request
{
public Request(string id, string name)
{
this.Id = id;
this.Name = name;
}
public string Id { get; set; }
public string Name { get; set; }
}
}
您應該讓您的異常通過數據流的管道傳播,以便您可以查看出了什麼問題。在[MSDN演練](http://msdn.microsoft.com/en-us/library/hh228604(v = vs.110).aspx)末尾查看MS的完整示例。然後你可以處理AggregateException來找出錯誤。 – JNYRanger
你的意思是你想讓一個具有相同ID的組被一個接一個地處理,而組可以被同時處理?如果是這樣就有你的答案:http://stackoverflow.com/q/21010024/885318 – i3arnon
@ I3arnon - 你的解決方案似乎是我正在尋找。我在這裏有點懶,但也許你有更多的細節。我猜你所得到的密鑰是動態的,所以基本上,突發的消息將具有相同的密鑰並且保持不斷變化。當一個動作塊忙於委託來處理消息時,它會更新一個字典,說我忙於處理這個請求,並且任何與該關鍵字匹配的後續請求都被委託給該動作塊?我對麼? – rizan