2011-04-13 138 views
3

我已經使用觀察者模式爲我的應用程序。帶計時器的觀察者模式

我有有一個System.Timers.Timer對象在它命名爲「TMR」的主題。此計時器的滴答事件在每60秒後觸發。在此蜱活動中,我將通知所有附屬於我的主題的觀察員。我已經使用for循環遍歷我的觀察者列表&,然後觸發觀察者更新方法。

假設我有10個觀察員附加到我的主題。

每個觀察者需要10秒來完成其處理。

現在在for循環中完成通知會導致最後一個觀察者的更新方法在90秒後被調用。即僅在前一個完成其處理之後調用Next Observer Update方法。

但這並不是我想要的應用程序。我需要所有觀察者的Update方法在發生計時器滴答時立即觸發。所以沒有觀察者必須等待。我希望這可以通過線程來完成。

所以,我修改了代碼,

// Fires the updates instantly 
    public void Notify() 
    { 
     foreach (Observer o in _observers) 
     { 
     Threading.Thread oThread = new Threading.Thread(o.Update); 
     oThread.Name = o.GetType().Name; 
     oThread.Start(); 
     } 
    } 

但我有我的兩個疑惑,

  1. 如果有10個觀察員 我的計時器的時間間隔爲60秒 然後聲明新線程()將會觸發600次。

    它是否有效,並建議在每個計時器刻度上創建新線程?

  2. 如果我的觀察者花費太多時間來完成更新邏輯,即超過60秒,該怎麼辦?意味着在觀察者更新之前發生計時器滴答聲。我怎樣才能控制這個?

我可以張貼的示例代碼。如果需要......

我使用的代碼..

using System; 
using System.Collections.Generic; 
using System.Timers; 
using System.Text; 
using Threading = System.Threading; 
using System.ComponentModel; 

namespace singletimers 
{ 
    class Program 
    { 


    static void Main(string[] args) 
    { 
     DataPullerSubject.Instance.Attach(Observer1.Instance); 
     DataPullerSubject.Instance.Attach(Observer2.Instance); 
     Console.ReadKey(); 
    } 
    } 

    public sealed class DataPullerSubject 
    { 
    private static volatile DataPullerSubject instance; 
    private static object syncRoot = new Object(); 
    public static DataPullerSubject Instance 
    { 
     get 
     { 
     if (instance == null) 
     { 
      lock (syncRoot) 
      { 
      if (instance == null) 
       instance = new DataPullerSubject(); 
      } 
     } 

     return instance; 
     } 
    } 

    int interval = 10 * 1000; 
    Timer tmr; 
    private List<Observer> _observers = new List<Observer>(); 

    DataPullerSubject() 
    { 
     tmr = new Timer(); 
     tmr.Interval = 1; // first time to call instantly 
     tmr.Elapsed += new ElapsedEventHandler(tmr_Elapsed); 
     tmr.Start(); 
    } 

    public void Attach(Observer observer) 
    { 
     _observers.Add(observer); 
    } 

    public void Detach(Observer observer) 
    { 
     _observers.Remove(observer); 
    } 

    // Fires the updates instantly 
    public void Notify() 
    { 
     foreach (Observer o in _observers) 
     { 
     Threading.Thread oThread = new Threading.Thread(o.Update); 
     oThread.Name = o.GetType().Name; 
     oThread.Start(); 
     } 
    } 

    private void tmr_Elapsed(object source, ElapsedEventArgs e) 
    { 
     tmr.Interval = interval; 
     tmr.Stop(); // stop the timer until all notification triggered 
     this.Notify(); 
     tmr.Start();//start again 
    } 
    } 


    public abstract class Observer 
    { 
    string data; 
    public abstract void Update(); 
    public virtual void GetDataFromDBAndSetToDataSet(string param) 
    { 
     Console.WriteLine("Processing for: " + param); 
     data = param + new Random().Next(1, 2000); 
     Threading.Thread.Sleep(10 * 1000);//long work 
     Console.WriteLine("Data set for: " + param); 
    } 
    } 


    public sealed class Observer1 : Observer 
    { 
    private static volatile Observer1 instance; 
    private static object syncRoot = new Object(); 
    public static Observer1 Instance 
    { 
     get 
     { 
     if (instance == null) 
     { 
      lock (syncRoot) 
      { 
      if (instance == null) 
       instance = new Observer1(); 
      } 
     } 

     return instance; 
     } 
    } 
    Observer1() 
    { 
    } 
    public override void Update() 
    { 
     base.GetDataFromDBAndSetToDataSet("Observer1"); 
    } 

    } 

    public sealed class Observer2 : Observer 
    { 
    private static volatile Observer2 instance; 
    private static object syncRoot = new Object(); 
    public static Observer2 Instance 
    { 
     get 
     { 
     if (instance == null) 
     { 
      lock (syncRoot) 
      { 
      if (instance == null) 
       instance = new Observer2(); 
      } 
     } 

     return instance; 
     } 
    } 
    Observer2() 
    { 
    } 
    public override void Update() 
    { 
     base.GetDataFromDBAndSetToDataSet("Observer2"); 
    } 

    } 
} 

感謝&親切的問候。

+0

有更好的方法來在c#中實現單例模式 - 請參閱http://www.yoda.arachsys.com/csharp/singleton.html – GarethOwen 2011-04-19 11:31:01

回答

1
  • 使用new Thread是不鼓勵的。使用TaskTask<T>
  • 您最好的創建Observable模式框架的嘗試可能只會接近Rx。使用那個解決你提到的問題(即如果處理花費太多時間)。 Rx將爲您定義可觀察的場景提供巨大的靈活性。
0

或者,觀察者可以以非阻塞的方式實現更新。 也就是說,更新總是立即返回。然後,如果需要,Observer對象有責任在新線程中執行其工作。

我不確定這是否有助於您的情況 - 我不知道您的'觀察員'是什麼,但是也可能您不知道?

+0

@Garen:我不明白你用「非阻塞方式」 。在我的觀察員中,我從數據庫中獲取數據。所以在這裏它可能會花費幾分鐘的時間。這樣每個觀察員都可以更新他們的公共領域。 – thinkmmk 2011-04-13 15:52:57

+0

@thinkmmk - 我的意思是對Update的調用立即返回,觀察者負責在單獨的線程中啓動Update操作。當更新正在進行時,觀察員也負責排隊或忽略更新呼叫。但是這取決於你的觀察者是否這是一個好的決定 - 你想避免在不同的'觀察者'類中重複相同的邏輯。 – GarethOwen 2011-04-14 06:54:25

+0

我已經用代碼編輯了我原來的帖子..現在我正在努力爭取同步,即不應再次調用obesever'x',除非它已完成其最後的工作。 – thinkmmk 2011-04-19 09:52:24