2013-08-24 44 views
0

美好的一天。正確的方法來過濾BlockBuffer.RecieveAsync

我有一個TPL數據流網爲RPC調用

它有兩個unkinked流,其以簡化的方式是這樣的:

輸出流量:

  • BlockBuffer存儲輸出
  • ActionBLock發送輸出到服務器併產生髮送ID

輸入流:

  • while循環收到數據
  • TransformBlock解析數據
  • BlockBuffer保存與sentid

答案有一個問題:當我提出從單獨的呼叫線程我可以混淆答案,所以我需要過濾它。

我的RPC調用:

public async Task<RpcAnswer> PerformRpcCall(Call rpccall) 
{ 
    ... 
    _outputRpcCalls.Post(rpccall); 
    long uniqueId = GetUniq(); // call unique id 
    ... 
    var sent = new Tuple<long, long>(uniqueId, 0); 
    while (_sentRpcCalls.TryReceive(u => u.Item1 == uniqueId, out sent)) ; // get generated id from send function 

    return await _inputAnswers.ReceiveAsync(TimeSpan.FromSeconds(30)); 
} 

,你可以看到我有UNIQUEID這可以幫助我確定的答案,此調用,但我怎麼可以過濾並等待呢?

是不是有好的方法來獲得一些緩衝區數組(WriteOnceBlock也許?),它將在rpc調用中創建,LinkedTo使用過濾器?

回答

0

好吧,我也沒有發現任何適當的方式,所以我做了一個骯髒的解決辦法

while (true) 
{ 
    answer = await _inputAnswers.ReceiveAsync(TimeSpan.FromSeconds(5)); 

    if (answer.Success) 
    { 
     if (answer.Answer.Combinator.ValueType.Equals(rpccall.Combinator.ValueType)) 
     { 
      break; 
     } 
     else 
     { 
      // wrong answer - post it back 
      _inputAnswers.Post(answer.Answer); 
     } 

    } 
    else 
    { 
     // answer fail - return it 
     break; 
    } 
} 
0

一種方式做,這將是爲每個ID的新塊,並將其鏈接到的答案塊用謂詞檢查id和MaxMessages設爲1:

Task<Answer> ReceiveAnswerAsync(int uniqueId) 
{ 
    var block = new BufferBlock<Answer>(); 

    _inputAnswers.LinkTo(
     block, 
     new DataflowLinkOptions { MaxMessages = 1, PropagateCompletion = true }, 
     answer => answer.Id == uniqueId); 

    return block.ReceiveAsync(); 
}