我有一個BufferBlock,而我發帖:TPL數據流LinkTo多個消費者不工作
public class DelimitedFileBlock : ISourceBlock<string>
{
private ISourceBlock<string> _source;
_source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });
//Read a file
While(!eof)
row = read one row
//if consumers are slow, then sleep for a while
while(!(_source as BufferBlock<string>).Post<string>(row))
{
Thread.Sleep(5000);
}
}
這是24萬行5GB的文件。
我現在有被使用ActionBlock目標塊:
public class SolaceTargetBlock : ITargetBlock<string>
private ActionBlock<IBasicDataContract> _publishToSolaceBlock;
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
{
//post to another block to publish
bool success = _publishToSolaceBlock.Post(messageValue);
現在,在一個控制檯應用程序,我指定:
SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);
我一直有限容量爲1只測試。
,現在我使用LinkTo這三個消費者鏈接到我的源:
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);
這正好Thread.sleep代碼(5000)後,10003行語句和郵政while循環始終返回false。
我期待着,因爲我有LinkTo,所以solaceTargetBlocks完成後將能夠挑選下一條消息,但LinkTo不會清除BufferBlock。那麼,我怎樣才能在多個消費者之間進行負載平衡。我是否必須接收並寫入一個簡單的負載平衡邏輯以在消費者之間分配?
這一切都取決於你究竟是如何實現的塊接口。但是,除非你正在做更復雜的事情,否則你不必自己實現接口(也可能不應該)。只需創建一個您需要的塊設置即可。 – svick