2014-12-20 60 views
0

我正在使用.Net構建股票報價更新程序。假設在上市時間內有X個股票代碼需要更新。爲了保持更新速度不超過數據提供者的限制(例如雅虎財務),我將嘗試通過使用類似於線程池的機制來限制請求數/秒。假設我想只允許5個請求/秒,對應於5個線程池。使用任務並行庫處理頻繁的URL請求

我聽說過TPL並希望使用它,雖然我沒有經驗。我如何在任務中指定隱式使用的池中的線程數?這裏是一個循環來安排請求,其中requestFunc(url)是更新引號的函數。我喜歡從專家那裏得到一些意見或建議,正確地做到這一點:

// X is a number much bigger than 5 
List<Task> tasks = new List<Task>(); 
for (int i=0; i<X; i++) 
{ 
    Task t = Task.Factory.StartNew(() => { requestFunc(url); }, TaskCreationOptions.None); 
    t.Wait(100); //slow down 100 ms. I am not sure if this is the right thing to do 
    tasks.Add(t); 
} 

Task.WaitAll(tasks); 

好的,我添加了一個外部循環,使其連續運行。當我對@ steve16351的代碼做一些修改時,它只會循環一次。爲什麼????

static void Main(string[] args) 
    { 
     LimitedExecutionRateTaskScheduler scheduler = new LimitedExecutionRateTaskScheduler(5); 
     TaskFactory factory = new TaskFactory(scheduler); 
     List<string> symbolsToCheck = new List<string>() { "GOOG", "AAPL", "MSFT", "AGIO", "MNK", "SPY", "EBAY", "INTC" }; 


     while (true) 
     { 
      List<Task> tasks = new List<Task>(); 
      Console.WriteLine("Starting..."); 

      foreach (string symbol in symbolsToCheck) 
      { 
       Task t = factory.StartNew(() => { write(symbol); }, 
                    CancellationToken.None, TaskCreationOptions.None, scheduler); 
       tasks.Add(t); 
      } 
      //Task.WhenAll(tasks); 

      Console.WriteLine("Ending..."); 
      Console.Read(); 
     } 

     //Console.Read(); 
    } 

    public static void write (string symbol) 
    { 
     DateTime dateValue = DateTime.Now; 
     //Console.WriteLine("[{0:HH:mm:ss}] Doing {1}..", DateTime.Now, symbol); 
     Console.WriteLine("Date and Time with Milliseconds: {0} doing {1}..", 
       dateValue.ToString("MM/dd/yyyy hh:mm:ss.fff tt"), symbol); 
    } 
+0

是否要限制爲5個/秒或5個併發操作? – i3arnon

+0

是的,我主要關心的是請求的數量,因爲我的IP可能會被阻止,如果我超過數據提供商的限制太長時間/頻繁。只要系統允許,線程是次要的。 –

+0

是的,這是處理外部服務時常見的限制。 TPL Dataflow非常適合這一點。 – i3arnon

回答

0

您可以使用自定義任務計劃程序,該計劃程序限制任務可以開始的速率。

在下面的任務排隊,並出現一個定時器設置爲您的最大允許速率的頻率。所以如果每秒請求5次,計時器設置爲200ms。在tick上,一個任務會從正在等待的任務中出列並執行。

編輯:除了請求率,您還可以擴展到控制執行線程的最大數量。

static void Main(string[] args) 
{ 
    TaskFactory factory = new TaskFactory(new LimitedExecutionRateTaskScheduler(5, 5)); // 5 per second, 5 max executing 
    List<string> symbolsToCheck = new List<string>() { "GOOG", "AAPL", "MSFT" }; 

    for (int i = 0; i < 5; i++) 
     symbolsToCheck.AddRange(symbolsToCheck); 

    foreach (string symbol in symbolsToCheck) 
    { 
     factory.StartNew(() => 
     { 
      Console.WriteLine("[{0:HH:mm:ss}] [{1}] Doing {2}..", DateTime.Now, Thread.CurrentThread.ManagedThreadId, symbol); 
      Thread.Sleep(5000); 
      Console.WriteLine("[{0:HH:mm:ss}] [{1}] {2} is done", DateTime.Now, Thread.CurrentThread.ManagedThreadId, symbol); 
     }); 
    } 

    Console.Read(); 
} 


public class LimitedExecutionRateTaskScheduler : TaskScheduler 
{ 
    private ConcurrentQueue<Task> _pendingTasks = new ConcurrentQueue<Task>();    
    private readonly object _taskLocker = new object(); 
    private List<Task> _executingTasks = new List<Task>(); 
    private readonly int _maximumConcurrencyLevel = 5; 
    private Timer _doWork = null; 

    public LimitedExecutionRateTaskScheduler(double requestsPerSecond, int maximumDegreeOfParallelism) 
    { 
     _maximumConcurrencyLevel = maximumDegreeOfParallelism; 
     long frequency = (long)(1000.0/requestsPerSecond); 
     _doWork = new Timer(ExecuteRequests, null, frequency, frequency); 
    } 

    public override int MaximumConcurrencyLevel 
    { 
     get 
     { 
      return _maximumConcurrencyLevel; 
     } 
    } 

    protected override bool TryDequeue(Task task) 
    { 
     return base.TryDequeue(task); 
    } 
    protected override void QueueTask(Task task) 
    { 
     _pendingTasks.Enqueue(task); 
    } 

    private void ExecuteRequests(object state) 
    { 
     Task queuedTask = null; 
     int currentlyExecutingTasks = 0; 

     lock (_taskLocker) 
     { 
      for (int i = 0; i < _executingTasks.Count; i++) 
       if (_executingTasks[i].IsCompleted) 
        _executingTasks.RemoveAt(i--); 

      currentlyExecutingTasks = _executingTasks.Count; 
     } 

     if (currentlyExecutingTasks == MaximumConcurrencyLevel) 
      return; 

     if (_pendingTasks.TryDequeue(out queuedTask) == false) 
      return; // no work to do 

     lock (_taskLocker) 
      _executingTasks.Add(queuedTask); 

     base.TryExecuteTask(queuedTask); 
    } 

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 
    { 
     return false; // not properly implemented just to complete the class 
    } 

    protected override IEnumerable<Task> GetScheduledTasks() 
    { 
     return new List<Task>(); // not properly implemented just to complete the class 
    } 
} 
+0

嗨,史蒂夫,謝謝你的代碼。我也會測試這個。 –

+0

Steve的解決方案似乎設置了每秒請求的限制。有用!由於我需要在市場時間內更新所有符號,我認爲仍然需要控制運行線程的數量。 –

+0

@Shuang Liang,我已經擴展了這個例子,允許你限制線程的執行。 – steve16351

1

不要擔心線程的數量;只要確保你沒有超過每秒的請求數量即可。每200毫秒使用一個計時器發出ManualResetEvent的信號,並讓這些任務在循環內等待該ManualResetEvent。

要創建一個定時器,使其信號ManualResetEvent的每200毫秒:

resetEvent = new ManualResetEvent(false); 
timer = new Timer((state)=>resetEvent.Set(), 200, 0); 

確保您清理定時器(調用Dispose),當你不需要它了。

讓線程數由運行時確定。

如果您創建每個股票的單個任務,這將是一個糟糕的實施,因爲您不知道何時更新股票。

所以你可以把所有的股票放在一個列表中,並讓一個任務一個接一個地更新每個股票。

通過給另一個任務提供另一個股票列表,您可以通過將其計時器設置爲每250毫秒並將低優先級設置爲每1000毫秒來給該任務一個更高的優先級。這將每秒增加5次,高優先級列表比低優先級更新4次。

+0

感謝您的意見。實際上,我打算在一個查詢或任務中綁定100個符號(這似乎是雅虎設置的上限),以便請求總數將會更低,因此請求數/秒也是如此。我沒有比其他人更喜歡的股票名單。我的唯一目的是更新我的清單中的全部(約1000),但並非所有市場中的所有股票都是。什麼是語法「每200毫秒發出一次ManualResetEvent」?謝謝! –

+0

@ShuangLiang我加入到我的回答 –

0

您可以使用具有任務延遲的while循環來控制何時發出請求。使用異步無效方法發出請求意味着您不會被失敗的請求阻止。

異步無效是火災並忘記哪些開發者不接受,但我認爲這將作爲一種可能的解決方案在這種情況下。

我也認爲erno de weerd圍繞優先調用更重要的股票提出了一個很好的建議。

2

如果你想擁有URL請求的流量,同時限制在不超過5個併發操作,你應該使用TPL數據流的ActionBlock

var block = new ActionBlock<string>(
    url => requestFunc(url), 
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

foreach (var url in urls) 
{ 
    block.Post(url); 
} 

block.Complete(); 
await block.Completion; 

Post給它的URL,併爲他們每個人它會在確保一次不超過MaxDegreeOfParallelism請求的同時執行請求。

完成後,您可以撥打Complete來發信號通知該塊完成,awaitCompletion任務將異步等待,直到該塊實際完成。

+0

嗨,很高興知道。我會測試這個想法。謝謝! –

+0

請注意,這不會執行每秒5個請求要求 –

+0

@ErnodeWeerd OP在註釋中確認這不是實際的要求。 – i3arnon

0

謝謝@ steve16351!它的工作原理是這樣的:

static void Main(string[] args) 
    { 
     LimitedExecutionRateTaskScheduler scheduler = new LimitedExecutionRateTaskScheduler(5); 
     TaskFactory factory = new TaskFactory(scheduler); 
     List<string> symbolsToCheck = new List<string>() { "GOOG", "AAPL", "MSFT", "AGIO", "MNK", "SPY", "EBAY", "INTC" }; 


     while (true) 
     { 
      List<Task> tasks = new List<Task>(); 
      foreach (string symbol in symbolsToCheck) 
      { 
       Task t = factory.StartNew(() => 
       { 
        write(symbol); 
       }, CancellationToken.None, 
        TaskCreationOptions.None, scheduler); 
       tasks.Add(t); 
      } 
     } 
    } 

    public static void write (string symbol) 
    { 
     DateTime dateValue = DateTime.Now; 
     Console.WriteLine("Date and Time with Milliseconds: {0} doing {1}..", 
       dateValue.ToString("MM/dd/yyyy hh:mm:ss.fff tt"), symbol); 
    }