2014-11-05 34 views
0

問題:我們有一些IP地址,那麼我們應該按照用戶指定的時間間隔來ping每一個,例如每400ms ping 192.168.0.1,每隔40000ms ping 192.168.137.20等等......我們如何以異步的方式處理這種情況?處理大量的異步作業

這是我從數據庫中加載目標信息,並創建一個傳感器爲他們每個人:

public class PingService 
{ 
    private CancellationTokenSource _cancel;   
    private List<PingSensor> _pings; 
    private IRepository<IPDevice> _deviceRepository; 


    public PingService() 
    { 
     _cancel = new CancellationTokenSource(); 
     _pings = new List<PingSensor>(); 
     _deviceRepository = ObjectFactory.GetInstance<IRepository<IPDevice>>(); 
    } 

    public void Start() 
    { 
     Action act = new Action(() => 
     { 
      IQueryable<IPDevice> allDevices = _deviceRepository.GetAll(); 

      foreach (IPDevice device in allDevices) 
      { 
       PingResultCollector collector = new PingResultCollector(device); 
       _pings.Add(new PingSensor(device.Address, collector, device.CheckDuration, _cancel.Token)); 
      } 

      foreach (PingSensor _ping in _pings) 
      { 
       _ping.DoDiscovery(); 
      } 
     }); 

     Task.Factory.StartNew(act); 
    } 

    public void Stop() 
    { 
     _cancel.Cancel(); 
    } 
} 

,這是我們做平,並等待用戶指定的延遲,收集響應和.. 。

public class PingSensor 
{ 
    private string _address;   
    Ping _ping; 
    private bool lastRequestReplayed; 
    private int _delay;   
    private CancellationToken _cancellationToken; 
    private PingResultCollector _resultCollector; 


    public PingSensor(string address, PingResultCollector resultCollector, int delay, CancellationToken CancellationToken) 
    { 
     _address = address; 
     _cancellationToken = CancellationToken; 
     _ping = new Ping(); 
     _resultCollector = resultCollector; 

     _ping.PingCompleted += _ping_PingCompleted; 
     _delay = delay; 
    } 

    void _ping_PingCompleted(object sender, PingCompletedEventArgs e) 
    { 
     if (_cancellationToken.IsCancellationRequested) 
      return; 

     lastRequestReplayed = true; 
     //_eventHandler(sender, e); 

     if (_resultCollector != null) 
      _resultCollector.CollectData(new PingStatus() 
      { 
       PingStatusId = Guid.NewGuid(), 
       Address = e.Reply == null ? "" : e.Reply.Address.ToString(), 
       Status = e.Reply == null ? e.Error.Message : e.Reply.Status.ToString(), 
       Target = e.UserState.ToString(), 
       ResponseTime = e.Reply == null ? 0 : e.Reply.RoundtripTime, 
       UpdateTime = DateTime.UtcNow 
      }); 
    } 

    public void DoDiscovery() 
    { 
     lastRequestReplayed = true; 
     Action act = new Action(() => 
     { 
      while (_cancellationToken.IsCancellationRequested!=true) 
      { 
       if (_cancellationToken.IsCancellationRequested) 
        return; 

       byte[] data = new byte[2]; 
       if (lastRequestReplayed) 
       { 
        _ping.SendAsync(_address, 30000, data, _address); 
        lastRequestReplayed = false; 
       } 
       if (_cancellationToken.IsCancellationRequested) 
        return; 
       Thread.Sleep(_delay); 
       if (_cancellationToken.IsCancellationRequested) 
        return; 
      } 
     }); 

     Task.Factory.StartNew(act); 
    } 
} 

以及與此代碼的問題是它創建每個傳感器一個線程,這意味着如果我有500目標萍我也有500個線程!任何建議?和我的英語不好很抱歉:d

+0

一個重要的說明,這_將不會啓動500個線程。這將**排隊** 500個任務,運行時將根據代碼運行的環境決定有多少線程適合執行,但是是一個'Task'!=線程。所以,如果你真的需要這500個任務,並行運行,那麼這可能不是一個好的解決方案。 – CodingGorilla 2014-11-05 20:22:18

+0

thx您的評論,我明白,我沒有測試它真的有500個任務,但對於70個目標我得到70個線程,這正是我的問題,一些任務彎腰很長時間,它可能需要30秒一個任務再次工作 – cyberw0lf 2014-11-05 20:33:05

+0

您是否可以選擇使用.Net 4.5? – 2014-11-05 20:47:44

回答

1

我能證明其工作代碼示例的概念..你來轉換成代碼,適合您的方案100%

using System; 
using System.Collections.Generic; 
using System.Net; 
using System.Net.NetworkInformation; 
using System.Threading.Tasks; 

namespace ConsoleApplication2 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      // Pick your long list of ip address 
      List<IPAddress> ips = new List<IPAddress> 
      { 
       new IPAddress(new byte[] {127, 0, 0, 1}), 
       new IPAddress(new byte[] {198, 252, 206, 16}), 
       new IPAddress(new byte[] {74, 125, 129, 99}), 
       // Add more ips as you like 
      }; 

      // Exactly what do you do with initiated tasks will depend on your specific scenario. 
      List<Task> tps = new List<Task>(); 

      foreach(var ip in ips) 
      { 
       // Delay could vary by IP, but I am hardcoding 10s here. 
       tps.Add(InitiatePings(ip, 10000)); 
      } 

      // Needed so that console app doesn't exit.. 
      Console.ReadLine(); 
     } 

     private static async Task InitiatePings(IPAddress ip, int delay) 
     { 
      while (true) 
      { 
       // Note, this API is different from SendAsync API you are using 
       // You may also want to reuse Ping instance instead of creating new one each time. 
       var result = await new Ping().SendPingAsync(ip); 
       // Process your result here, however you want. 
       Console.WriteLine(result.Address + "-" + result.Status + "-" + result.RoundtripTime); 
       // Assumes that the delay is not absolute, but time between ping, result processing and next ping. 
       await Task.Delay(delay); 
      } 
     } 
    } 
} 
1

爲什麼不設置一個Timer爲每個你想ping的IP地址?那就是:

List<System.Threading.Timer> timers = new List<System.Threading.Timer>(); 

您與添加各個計時器:

timers.Add(new Timer(PingTimerProc, "192.168.1.1", 400, 400)); 
timers.Add(new Timer(PingTimerProc, "10.10.200.50", 42000, 42000)); 
// add other timers 

而且你的計時器PROC:

void PingTimerProc(object state) 
{ 
    string ipAddress = (string)state; 
    // do the ping here 
} 

這裏唯一潛在的缺點是具有足夠大量的IP地址,你需要非常頻繁地ping,你冒着無法迅速處理所有定時器事件的風險。這最終會導致顛簸,因爲你會有一堆掛起的線程。

如果你用一個線程做這件事,並且你在這麼短的時間間隔內有很多ping,那麼你就會開始落後了。它不會最終殺死你的過程。

如果我要用一個線程來完成,我會創建一個IP地址和相關聯的ping時間的優先隊列。並設置一個長時間運行的任務來爲該隊列提供服務。其基本結構是這樣的:

class PingJob 
{ 
    string IPAddress; 
    int Period; // milliseconds between pings 
    TimeSpan NextPingTime; 
} 

var queue = new PriorityQueue<PingJob>(); 

// create the ping jobs and add them to the queue 
queue.Add(new PingJob { IPAddress = "192.168.1.1", Period = 400, NextPingTime = TimeSpan.FromMilliseconds(400) }); 
queue.Add(new PingJob { IPAddress = "10.10.200.50", Period = 42000, NextPingTime = TimeSpan.FromMilliseconds(42000) }); 
// etc. 

// start a thread that processes the pings 
var pingTask = Task.Factory.StartNew(PingTaskProc, TaskCreationOptions.LongRunning); 

,任務PROC:

void PingTaskProc() 
{ 
    var pingTimer = Stopwatch.StartNew(); 
    while (true) 
    { 
     var p = queue.Dequeue(); // get ping job 
     if (p.NextPingTime > pingTimer.Elapsed) 
     { 
      // wait for ping time to come up 
      Thread.Sleep(p.NextPingTime - pingTimer.Elapsed); 
     } 
     // ping the IP address 
     DoPing(p.IPAddress); 
     // update its next ping time 
     p.NextPingTime += TimeSpan.FromMilliseconds(p.Period); 
     // And add it back to the queue 
     queue.Add(p); 
    } 
} 

那假設你有一個通用的優先級隊列中的數據結構中可用。有很多可用的,包括mine

如果您可以從隊列中添加和刪除IP地址或更改ping時間,情況會變得更有趣。但你沒有具體說明。