2012-03-20 118 views
5

我有一個要求將Web服務請求發送到在線api,我認爲並行擴展將非常適合我的需求。任務並行庫 - 自定義任務計劃程序

有問題的Web服務被設計爲重複調用,但如果您每秒接收到一定數量的呼叫,則會有一個機制向您收費。我顯然希望儘量減少我收取的費用等不知道是否有人已經看到了的TaskScheduler可以用以下要求處理:

  1. 限制計劃每時間跨度的任務數。我想如果請求的數量超過了這個限制,那麼它將需要丟棄任務或可能阻止? (停止任務的後臺日誌)
  2. 檢測相同的請求是否已經在要執行但尚未執行的調度程序中,如果沒有,則不排隊第二個任務,而是返回第一個任務。

人們是否覺得這些是任務調度程序應該處理的責任或者是我在吠叫錯誤的樹?如果你有其他選擇,我願意接受建議。

+0

我認爲至少#2是不可能做到與TaskScheduler,因爲它處理「任務」,並且沒有辦法從中獲取這些信息。 – svick 2012-03-20 22:25:43

+0

您是否對使用C#5/.Net 4.5的解決方案感興趣? – svick 2012-03-20 22:26:10

+0

@svick絕對,我有幸被允許使用c#5 – Fen 2012-03-20 22:32:36

回答

7

我同意其他人TPL數據流聽起來這是很好的解決方案。

要限制處理,你可以創建一個TransformBlock實際上並沒有以任何方式轉換數據,它只是延緩它,如果它以前的數據後立即趕到:

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay) 
{ 
    DateTime lastItem = DateTime.MinValue; 
    return new TransformBlock<T, T>(
     async x => 
       { 
        var waitTime = lastItem + delay - DateTime.UtcNow; 
        if (waitTime > TimeSpan.Zero) 
         await Task.Delay(waitTime); 

        lastItem = DateTime.UtcNow; 

        return x; 
       }, 
     new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 
} 

然後創建一個方法產生的數據(從0開始的整數例子):

static async Task Producer(ITargetBlock<int> target) 
{ 
    int i = 0; 
    while (await target.SendAsync(i)) 
     i++; 
} 

它的異步寫入,因此,如果目標塊是不能夠馬上處理的項目,它會等待。

然後寫一個消費者方法:

static void Consumer(int i) 
{ 
    Console.WriteLine(i); 
} 

最後,連在一起這一切,並啓動它:

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500)); 

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer, 
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); 

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true }); 

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion); 

這裏,delayBlock將接受最多一個每500毫秒項和方法可以並行運行多次。要完成處理,請撥打delayBlock.Complete()

如果你想爲你的#2添加一些緩存,你可以創建另一個TransformBlock做那裏的工作,並將其鏈接到其他塊。

+0

賓果,這正是我想到的。在你提供實際的實現方面很好,我只是無法找到時間。 – 2012-03-21 17:58:25

+1

整潔。有一件事要注意,如果你運行的是異步CTP而不是.NET 4。5,您需要將Task.Delay更改爲TaskEx.Delay。 – 2012-03-21 19:21:23

+0

@ DPeden,我認爲你的意思是.Net 4.5(目前處於測試階段)。 – svick 2012-03-21 19:23:13

0

如果你需要時間節流,你應該檢查出Quartz.net。它可以促進一致的投票。如果你關心所有的請求,你應該考慮使用某種排隊機制。 MSMQ可能是正確的解決方案,但是如果您想要擴大規模並使用像NServiceBusRabbitMQ這樣的ESB,則有許多特定的實施方案。

更新:

在這種情況下,TPL數據流是首選的解決方案,如果你能充分利用CTP。受限制的BufferBlock是解決方案。

這個例子來自documentation provided by Microsoft

// Hand-off through a bounded BufferBlock<T> 
private static BufferBlock<int> m_buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 }); 

// Producer 
private static async void Producer() 
{ 
    while(true) 
    { 
     await m_buffer.SendAsync(Produce()); 
    } 
} 

// Consumer 
private static async Task Consumer() 
{ 
    while(true) 
    { 
     Process(await m_buffer.ReceiveAsync()); 
    } 
} 

// Start the Producer and Consumer 
private static async Task Run() 
{ 
    await Task.WhenAll(Producer(), Consumer()); 
} 

更新:

退房RX的Observable.Throttle

+0

我想保持一切正在進行中,所以排除了排隊作爲解決方案。我不認爲Quartz是正確的解決方案,調度不是問題,它限制了請求的數量,並且對我所做的調用有點聰明。感謝您的建議,雖然 – Fen 2012-03-20 22:12:17

+0

@DarenFox我根據您的反饋更新了我的答案。 – 2012-03-20 22:17:21

+0

@DarenFox這究竟如何防止每秒超過_N_個請求?看起來你只能限制10個併發未處理的調用,但很明顯,如果調用速度超過每秒10毫秒,可能會超過每秒限制。 FWIW,我回答說TPL數據流可能是最好的方式,但是您在技術上需要一個時間性的BufferBlock實現。 – 2012-03-20 22:24:44

3

老實說,我會在更高層次的抽象工作,並使用TPL Dataflow API。唯一的問題是您需要編寫一個自定義塊,以您需要的速度調整請求,因爲默認情況下,塊是「貪婪」的,並且會盡可能快地進行處理。實現將如下所示:

  1. BufferBlock<T>開始,這是您要發佈到的邏輯塊。
  2. BufferBlock<T>鏈接到具有請求/秒和限制邏輯知識的自定義塊。
  3. 將自定義塊從2鏈接到您的ActionBlock<T>

我沒有時間在第二秒寫上#2的自定義塊,但是如果您還沒有弄明白,我會稍後再回來檢查並嘗試填寫實施。

+0

由於我沒有查看TPL Dataflow,我真的很感激你的幫助 – Fen 2012-03-20 22:46:08