2012-09-20 52 views
5

我有一個BufferBlock,而我發帖:TPL數據流LinkTo多個消費者不工作

public class DelimitedFileBlock : ISourceBlock<string> 
{ 
    private ISourceBlock<string> _source; 
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 }); 

    //Read a file 
    While(!eof) 
     row = read one row 
    //if consumers are slow, then sleep for a while 
    while(!(_source as BufferBlock<string>).Post<string>(row)) 
    { 
     Thread.Sleep(5000); 
    } 
} 

這是24萬行5GB的文件。

我現在有被使用ActionBlock目標塊:

public class SolaceTargetBlock : ITargetBlock<string> 
     private ActionBlock<IBasicDataContract> _publishToSolaceBlock; 

     public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept) 
    { 
     //post to another block to publish 
     bool success = _publishToSolaceBlock.Post(messageValue); 

現在,在一個控制檯應用程序,我指定:

SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
      new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 }); 
SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam", 
      new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 }); 
SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam", 
      new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 }); 

DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo); 

我一直有限容量爲1只測試。

,現在我使用LinkTo這三個消費者鏈接到我的源:

delimitedFileBlock.LinkTo(solaceTargetBlock1);  
delimitedFileBlock.LinkTo(solaceTargetBlock2);  
delimitedFileBlock.LinkTo(solaceTargetBlock3);  

這正好Thread.sleep代碼(5000)後,10003行語句和郵政while循環始終返回false。

我期待着,因爲我有LinkTo,所以solaceTargetBlocks完成後將能夠挑選下一條消息,但LinkTo不會清除BufferBlock。那麼,我怎樣才能在多個消費者之間進行負載平衡。我是否必須接收並寫入一個簡單的負載平衡邏輯以在消費者之間分配?

+1

這一切都取決於你究竟是如何實現的塊接口。但是,除非你正在做更復雜的事情,否則你不必自己實現接口(也可能不應該)。只需創建一個您需要的塊設置即可。 – svick

回答

8

Post methodDataflowBlock<T> class(重點煤礦)的文檔:

此方法將返回一旦目標塊已決定接受或拒絕項目,

這意味着目標可以選擇拒絕該塊(這是您所看到的行爲)。

而且,它指出:

對於支持推遲要約的消息目標塊,或者選擇那些在其實施後做更多的處理模塊,可以考慮使用SendAsync,這將立即返回,並將使目標是推遲發佈的消息,並在SendAsync返回後使用它。

這意味着你可能更好的結果(根據目標區塊),因爲你的信息可能會被推遲,但仍處理,而不是斷然拒絕。

我想象的BufferBlock<T>兩個BoundedCapacity property設置和三個ActionBlock<TInput>實例必須是與你看到的:

  • BufferBlock<T>最大緩衝區爲10000;一旦您將10,000個物品放入隊列中,它將拒絕剩餘的物品(請參閱上面的第二個報價單),因爲它無法處理它們(因爲它無法緩衝要推遲的消息,所以SendAsync也不會在此處起作用)。

  • 您在ActionBlock<TInput>情況下,最大的緩衝區是1,你有三個。

10000+(1×3)= 10,000 + 3 = 1萬3

要解決這個問題,你需要做的幾件事情。

首先,在創建ActionBlock<TInput>實例時,您需要爲MaxDegreeOfParallelism property設置更合理的值ExecutionDataFlowBlockOptions

缺省情況下,MaxDegreeOfParallelism用於ActionBlock<TInput>被設置爲1;這保證了調用將被序列化,並且您不必擔心線程安全性。如果您希望ActionBlock<T>關注線程安全性,請保留此設置。

如果ActionBlock<TInput>線程安全的,那麼你沒有理由阻止它,你應該設置MaxDegreeOfParallelismDataflowBlockOptions.Unbounded

機會是,如果你正在訪問某種在可以同時在有限的基礎上進行訪問ActionBlock<TInput>共享資源,那麼你很可能做錯事。

如果您有某種共享資源,那麼您應該通過另一個塊運行它,並將MaxDegreeOfParallelism設置爲。

其次,如果您關心的吞吐量,並確定與掉落的物品,那麼你應該設置BoundedCapacity屬性。

另外請注意,您表示:「如果消費者是緩慢的,睡一會」;如果你正確連接你的數據塊,沒有理由這樣做,你應該讓數據流通並將限制僅限於需要它們的地方。您的製片人不應負責限制消費者,讓消費者負責限制。

最後,你的代碼看起來並不像你需要自己實現數據流塊接口。你可以構建像這樣:

// The source, your read lines will be posted here. 
var delimitedFileBlock = new BufferBlock<string>(); 

// The Action for the action blocks. 
Action<string> action = 
    s => { /* Do something with the string here. */ }; 

// Create the action blocks, assuming that 
// action is thread-safe, no need to have it process one at a time 
// or to bound the capacity. 
var solaceActionBlock1 = new ActionBlock<string>(action, 
    new ExecutionDataflowBlockOptions { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, 
    }); 
var solaceActionBlock2 = new ActionBlock<string>(action, 
    new ExecutionDataflowBlockOptions { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, 
    }); 
var solaceActionBlock3 = new ActionBlock<string>(action, 
    new ExecutionDataflowBlockOptions { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, 
    }); 

// Link everything. 
delimitedFileBlock.LinkTo(solaceTargetBlock1); 
delimitedFileBlock.LinkTo(solaceTargetBlock2); 
delimitedFileBlock.LinkTo(solaceTargetBlock3); 

// Now read the file, and post to the BufferBlock<T>: 
// Note: This is pseudo-code. 
while (!eof) 
{ 
    // Read the row. 
    string row = ...; 

    delimitedFileBlock.Post(read); 
} 

還要注意的是有三個ActionBlock<TInput>實例是uncessary,除非你需要過濾輸出到不同的動作(這你是不是在這裏做什麼),因此上述確實減少了(假設你的行動是線程安全的,所以你要增加MaxDegreeOfParallelismUnbounded反正):

// The source, your read lines will be posted here. 
var delimitedFileBlock = new BufferBlock<string>(); 

// The Action for the action blocks. 
Action<string> action = 
    s => { /* Do something with the string here. */ }; 

// Create the action blocks, assuming that 
// action is thread-safe, no need to have it process one at a time 
// or to bound the capacity. 
var solaceActionBlock1 = new ActionBlock<string>(action, 
    new ExecutionDataflowBlockOptions { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, 
    }); 

// Link everything. 
delimitedFileBlock.LinkTo(solaceTargetBlock); 

// Now read the file, and post to the BufferBlock<T>: 
// Note: This is pseudo-code. 
while (!eof) 
{ 
    // Read the row. 
    string row = ...; 

    delimitedFileBlock.Post(read); 
} 
相關問題