2017-03-16 105 views
0

將項目發佈到TPL DataFlow時,是否有任何機制可以允許延遲發佈?延遲發佈到DataFlow

public partial class BasicDataFlowService 
{ 
    private readonly ActionBlock<string> workerBlock; 

    public BasicDataFlowService() 
    { 
     workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions() 
     { 
      MaxDegreeOfParallelism = 32 
     }); 
    } 

    partial void DoWork(string fileName); 

    private void AddToDataFlow(string file) 
    { 
     workerBlock.Post(file); 
    } 
} 

AddToDataFlow,我希望能夠指定一個延遲的項目被處理之前(例如,如果我們決定我們要推遲30秒的處理)。

我的確考慮使用TransFormBlocknew System.Threading.ManualResetEvent(false).WaitOne(1000);,例如,

var requeueBlock = new TransformBlock<string, string>(file => 
{ 
    new System.Threading.ManualResetEvent(false).WaitOne(1000); 
    return file; 
}); 

requeueBlock.LinkTo(workerBlock); 

但是,這似乎是消耗了一個不必要的線程,可以被鏈中的其他塊使用。

回答

0

首先,您需要將ManualResetEvent作爲單例存儲,否則所有線程都會得到自己的對象來等待,並且您的方法不起作用。

其次,如果您需要在流水線中的一個AppDomain內執行同步,請考慮ManualResetEventSlim版本而不是重型ManualResetEvent

如果你想重複使用你的機器的核心,而沒有無用的等待,你應該看看SpinWait輕量級結構。您可能會發現Joseph Albahari' article有用在這種情況下:

// singleton variable 
bool _proceed; 

var requeueBlock = new TransformBlock<string, string>(file => 
{ 
    var spinWait = new SpinWait(); 
    while (!_proceed) 
    { 
     // ensure we have the latest _proceed value 
     Thread.MemoryBarrier(); 
     // try to spin for a while 
     // after some spins, yield to another thread 
     spinWait.SpinOnce(); 
    } 
    return file; 
}); 

SpinWait內部決定,如何一代產量:與Sleep(0)Sleep(1)Yield方法調用,所以它對於你的情況相當有效。

0

要在將值發佈到workerBlock之前添加延遲,只需插入延遲並在發佈該值之前等待它。如果您的workerBlock具有有限容量,您可以await SendAsync。幾個選項來完成目標:

private async Task AddToDataflow(string file, TimeSpan delay) { 
    await Task.Delay(delay); 
    await workerBlock.SendAsync(file); 
} 

private async Task AddToDataflow(string file) { 
    var delay = TimeSpan.FromSeconds(30); 
    await Task.Delay(delay); 
    await workerBlock.SendAsync(file); 
} 

private async void AddToDataflow(string file) { 
    var delay = TimeSpan.FromSeconds(30); 
    await Task.Delay(delay); 
    workerBlock.Post(file); 
}