2014-02-14 30 views
1

我正在學習TPL數據流,我已經通過我的一些朋友看到了它的強大功能,我遇到了一個與我的實現有關的問題。以無限並行方式多次運行相同的ActionBlock

我想/需要的是儘可能快地發送消息。我在做一些LinqPad原型,這是我迄今:

// Holds all the messages for my loadMessage ActionBlock to grab its data from 
var bufferBlock = new BufferBlock<string>(); 

// Sends message to where it needs to go as fast as it can. 
var loadMessage = new ActionBlock<string>(msg => 
{ 
    msg.Dump(); 
}, 
new ExecutionDataflowBlockOptions 
{ 
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
}); 

// Links the blocks together 
bufferBlock.LinkTo(loadMessage); 

// Loads the Buffer 
for (int i = 0; i < 10; i++) 
{ 
    bufferBlock.Post("This is a message"); 
} 

//Calls completion to stop threads 
bufferBlock.Complete(); 
loadMessage.Complete(); 

的問題是,loadMessageBlock沒有在例子傾銷上述消息。我一直在尋找一些有點運氣的見解。我想我錯過了TPL的基本要求。我的理解是,BufferBlock保存要由其他塊處理的信息,並且ActionBlocked(與BufferBlock鏈接)應該從緩衝區中獲取數據並執行其所需的操作。在將信息放到緩衝區的For循環之後,停止完成被調用以停止線程。

在實現中,我有一個Parallel.For,它運行我的loadMessage內的代碼就好了。我只是不能實現TPL來做我想做的事情,我的理解是TPL將比Parallel.For更快。

我在這裏如何認爲這是假設工作?我錯誤地使用TPL嗎?我將繼續研究一個答案,任何指針將受到高度讚賞。謝謝!

回答

-2
  1. 我想/需要的是和我一樣快可以發送郵件:

    爲了實現這一目標,就需要發佈/同時接收數據,並從緩衝塊。下面是摘錄:

    var bufferBlock = new BufferBlock<string>(); 
    // Write to and read from the message block concurrently. 
    var post01 = Task.Run(() => 
    { 
        // Loads the Buffer 
        for (int i = 0; i < 10; i++) 
        { 
         bufferBlock.Post(string.Format("This is a message {0}",i)); 
        } 
    }); 
    var receive = Task.Run(() => 
    { 
        for (int i = 0; i < 10; i++) 
        { 
         var message = bufferBlock.Receive(); 
         message.Dump(); 
        } 
    }); 
    
    
    Task.WaitAll(post01, receive); 
    

    更多關於這一點,你可以看到在MSDN link

  2. 我的理解是,TPL會比更快的Parallel.For。

    這不正確,因爲它們使用相同的基礎結構。它們屬於同一個名稱空間System.Threading.Tasks

+0

感謝您的幫助。我沒有最終使用TPL來處理這種特殊情況,但我正在閱讀有關Task和Threading的更多信息以供將來實現。 – Schanckopotamus

+0

這裏沒有必要使用'Task'接收,這正是'ActionBlock'的作用。 – svick

+0

@svick無論你選擇什麼方式都很好,甚至在我發佈的MSDN鏈接的例子中,他們都使用任務。沒有理由認爲吃麪包比吃米飯更好! –

1

首先,一個關於術語的解釋:TPL(以下簡稱任務並行庫)是不一樣的TPL數據流,這只是一個子集。整個TPL包括諸如Parallel.For()Task類型的東西。

現在,您的代碼的問題是,您正在儘快完成loadMessage塊。致電Complete()後,該塊不會再接受任何消息,因此您發佈到bufferBlock的消息永遠不會達到loadMessage

你需要的只是在bufferBlock發送完所有消息後才需要完成loadMessage。這正是PropagateCompletion做:

bufferBlock.LinkTo(
    loadMessage, new DataflowLinkOptions { PropagateCompletion = true }); 

// post your data to bufferBlock here 

bufferBlock.Complete(); 
await loadMessage.Completion; 

而且,在這種特定的情況下,是不是需要在所有的bufferBlock,你可能只是直接發佈消息loadMessage

我的理解是,太平人壽將快於的Parallel.For

我不明白爲什麼應該在一般的快。一般情況下,他們的表現應該是可比的。所以你應該更好地使用適合你的問題的方法,而不是選擇一個,因爲「更快」。如果你真的關心性能,那就用兩種方式編寫代碼,然後測量哪一個更好。