2008-12-16 79 views
17

我正在開發一個程序,它不斷地在後臺發送數據流,我希望允許用戶爲上傳和下載限制設置上限。C#中的帶寬限制#

我對token bucketleaky bucket alghorhithms念起來,貌似後者似乎適合的描述,因爲這不是最大化網絡帶寬,而是被儘可能不引人注目的問題。

但是,我有點不確定我會如何實現這一點。一種自然的方法是擴展抽象的Stream類,以便擴展現有的流量,但是這會不會需要額外的線程來發送數據,同時接收(泄漏的桶)呢?任何暗示其他實現相同的提示將不勝感激。

此外,儘管我可以修改程序接收的數據量,但是C#級別的帶寬限制工作有多好?計算機是否仍會收到數據並簡單地保存數據,從而有效地取消節流效果,還是等到我要求接收更多數據?

編輯:我對節流傳入和傳出數據,其中I具有通過該流的相對端沒有控制權。

回答

1

我想出了arul提到的ThrottledStream類的不同實現。我的版本使用的WaitHandle和定時器用1秒間隔:

public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
{ 
    MaxBytesPerSecond = maxBytesPerSecond; 
    parent = parentStream; 
    processed = 0; 
    resettimer = new System.Timers.Timer(); 
    resettimer.Interval = 1000; 
    resettimer.Elapsed += resettimer_Elapsed; 
    resettimer.Start();   
} 

protected void Throttle(int bytes) 
{ 
    try 
    { 
     processed += bytes; 
     if (processed >= maxBytesPerSecond) 
      wh.WaitOne(); 
    } 
    catch 
    { 
    } 
} 

private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
{ 
    processed = 0; 
    wh.Set(); 
} 

每當帶寬限制超過線程將休眠,直到下一秒開始。無需計算最佳睡眠時間。

全面實施:

public class ThrottledStream : Stream 
{ 
    #region Properties 

    private int maxBytesPerSecond; 
    /// <summary> 
    /// Number of Bytes that are allowed per second 
    /// </summary> 
    public int MaxBytesPerSecond 
    { 
     get { return maxBytesPerSecond; } 
     set 
     { 
      if (value < 1) 
       throw new ArgumentException("MaxBytesPerSecond has to be >0"); 

      maxBytesPerSecond = value; 
     } 
    } 

    #endregion 


    #region Private Members 

    private int processed; 
    System.Timers.Timer resettimer; 
    AutoResetEvent wh = new AutoResetEvent(true); 
    private Stream parent; 

    #endregion 

    /// <summary> 
    /// Creates a new Stream with Databandwith cap 
    /// </summary> 
    /// <param name="parentStream"></param> 
    /// <param name="maxBytesPerSecond"></param> 
    public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
    { 
     MaxBytesPerSecond = maxBytesPerSecond; 
     parent = parentStream; 
     processed = 0; 
     resettimer = new System.Timers.Timer(); 
     resettimer.Interval = 1000; 
     resettimer.Elapsed += resettimer_Elapsed; 
     resettimer.Start();   
    } 

    protected void Throttle(int bytes) 
    { 
     try 
     { 
      processed += bytes; 
      if (processed >= maxBytesPerSecond) 
       wh.WaitOne(); 
     } 
     catch 
     { 
     } 
    } 

    private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
    { 
     processed = 0; 
     wh.Set(); 
    } 

    #region Stream-Overrides 

    public override void Close() 
    { 
     resettimer.Stop(); 
     resettimer.Close(); 
     base.Close(); 
    } 
    protected override void Dispose(bool disposing) 
    { 
     resettimer.Dispose(); 
     base.Dispose(disposing); 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     return parent.Read(buffer, offset, count); 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 

    #endregion 


} 
+0

如果當計時器滴答你不設置`processed`到`0`將變得更加準確,但從中減去`maxBytesPerSecond`。 – 2017-02-09 06:25:27

1

基於@ 0xDEADBEEF的解決方案,我創建了以下(可測試)解決方案基礎上的Rx調度:

public class ThrottledStream : Stream 
{ 
    private readonly Stream parent; 
    private readonly int maxBytesPerSecond; 
    private readonly IScheduler scheduler; 
    private readonly IStopwatch stopwatch; 

    private long processed; 

    public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler) 
    { 
     this.maxBytesPerSecond = maxBytesPerSecond; 
     this.parent = parent; 
     this.scheduler = scheduler; 
     stopwatch = scheduler.StartStopwatch(); 
     processed = 0; 
    } 

    public ThrottledStream(Stream parent, int maxBytesPerSecond) 
     : this (parent, maxBytesPerSecond, Scheduler.Immediate) 
    { 
    } 

    protected void Throttle(int bytes) 
    { 
     processed += bytes; 
     var targetTime = TimeSpan.FromSeconds((double)processed/maxBytesPerSecond); 
     var actualTime = stopwatch.Elapsed; 
     var sleep = targetTime - actualTime; 
     if (sleep > TimeSpan.Zero) 
     { 
      using (var waitHandle = new AutoResetEvent(initialState: false)) 
      { 
       scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set()); 
       waitHandle.WaitOne(); 
      } 
     } 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     var read = parent.Read(buffer, offset, count); 
     Throttle(read); 
     return read; 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 
} 

和一些測試,只需要幾毫秒:

[TestMethod] 
public void ShouldThrottleReading() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new ThrottledStream(new MemoryStream(content), content.Length/8, scheduler); 
    var target = new MemoryStream(); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
} 

[TestMethod] 
public void ShouldThrottleWriting() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new MemoryStream(content); 
    var target = new ThrottledStream(new MemoryStream(), content.Length/8, scheduler); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
}