2012-06-12 86 views
4

我正在嘗試使用TPL數據流,通過移植一些舊的套接字代碼來使用TPL數據流和新的異步功能。儘管API感覺很穩定,但我的代碼仍然感覺凌亂。我想知道我是否在這裏錯過了一些東西。TPL數據流使用異步操作

我的要求如下:一個套接字類公開:打開,關閉,發送和接收方法。所有返回一個任務,因此是異步的。打開和關閉是原子的。發送和接收可以彼此相鄰,儘管兩者一次只能處理1條命令。

邏輯上,這使我對內部控制的下一段代碼:

// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with 
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair; 
private readonly ActionBlock<Action> connectionBlock; 
private readonly ActionBlock<Action> sendBlock; 
private readonly ActionBlock<Action> receiveBlock; 

// within the constructor: 
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair(); 
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler }); 
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler }); 
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler }); 

到目前爲止都很好。我可以安全地將操作發送到發送和接收模塊,而無需擔心與此同時運行的連接相關操作。另外ActionBlock確保多個發送調用是同步的(同上接收,關閉和打開)。

問題是,沒有簡單的方法將任務傳回給海報。現在我正在使用TaskCompletionSource傳遞結果。喜歡:

public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken) 
{ 
    TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>(); 

    sendBlock.Post(async() => 
    { 
     if (!tcpClient.Connected) 
      throw new InvalidOperationException("Cant send when not open"); 
     else 
     { 
      await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken); 
      resultCompletionSource.SetResult(null); 
     } 
    }); 

    return resultCompletionSource.Task; 
} 

這只是感覺醜陋和笨拙。我的問題是:有沒有辦法使用TPL同步工作流而不必使用TaskCompletionSource進行通信?

謝謝!

+1

我沒有使用TPL數據流,所以這可能是遙遠,但如果你需要的動作完成時就知道了,好像你不希望發佈Post的'fire and forget',而應該使用SendAsync,以便讓Task返回而不是? http://msdn.microsoft.com/en-us/library/hh194681(v=vs.110) –

+1

據我瞭解,當帖子被接受或明確拒絕時,SendAsync會返回一個任務。因爲我使用無限制的動作塊。帖子將始終被直接接受。 SendAsync不能幫助我的案例 – Polity

回答

4

首先,您根本不需要TPL Dataflow,因爲您實際上沒有任何數據流。

其次,使用TaskScheduler這樣的s也不是正確的解決方案。 TaskScheduler的日程安排代碼,但是當你有一些東西時,沒有代碼在運行。因此,雖然WriteAsync()正在執行其異步工作,但代碼Open()可以運行。

你真正需要的是像ReaderWriterLock,但與async效果很好。框架中沒有類似的東西,但是您可以使用Stephen Toub's article Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock中的代碼,它完全符合您的需求。文章還詳細解釋了爲什麼使用TaskScheduler是錯誤的。

使用AsyncReaderWriterLock,你的代碼可能是這樣的:

public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken) 
{ 
    using (await readerWriterLock.ReaderLockAsync()) 
    { 
     if (!tcpClient.Connected) 
      throw new InvalidOperationException("Can't send when not open"); 

     await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken); 
    } 
} 
+0

在閱讀OP的問題時,我的第一個想法是鏈接到Toub的文章並給出一些背景知識。我猶豫不決,因爲我不記得Toub寫過關於這個問題的博客文章,白皮書等。然後我看到這個答案已經熟練地解釋了它,並鏈接到了確切的文章!每隔一段時間,您都會閱讀一個答案,希望您可以多次讚揚! – payo

+0

謝謝你的回答,儘管它還沒有讓我滿意。打開和關閉應完全由發送和接收以及另一個來運行。儘管在使用其中的2個時異步rwlock會起作用,但這將是一個難以理解的解決方案。基於ConcurrentExclusiveSchedulerPair調度任務似乎是簡單而易於同步的。我認爲你錯誤的假設是Open應該能夠在Send運行時運行,而不是這種情況。當打開被調用時,任何正在運行的發送/接收應該在實際打開之前完成。 – Polity

+0

@Polity我的觀點是,如果你有'async'方法並且使用'ConcurrentExclusiveSchedulerPair',那麼當'Send()'正在等待時,'Open()'將能夠運行。但是如果你使用'AsyncReaderWriterLock',它不會。我認爲這正是你想要的。 – svick