2012-02-08 70 views
4

我正在從我們的ASP.NET站點刪除我們的電子郵件系統,該站點用於立即用系統發送電子郵件,以便在單獨的服務中處理請求以減少網站的工作量。我試圖圍繞一組接口進行設計,以便我可以交換實現(如果需要的話),但最初它將基於消息隊列(MSMQ)將請求發送到隊列,讓服務接收傳入的請求然後處理它們。我現在有一個粗略的定義以下接口:如何創建從MSMQ消息隊列中讀取的IObservable <T>?

// Sends one or more requests to be processed somehow 
public interface IRequestSender 
{ 
    void Send(IEnumerable<Request> requests); 
} 

// Listens for incoming requests and passes them to an observer to do the real work 
public interface IRequestListener : IObservable<Request> 
{ 
    void Start(); 
    void Stop(); 
} 

// Processes a request given to it by a IRequestListener 
public interface IRequestProcessor : IObserver<Request> 
{ 
} 

你會發現,監聽器和處理器使用可觀察的模式,因爲這是我認爲似乎適合最好。

我的問題是搞清楚如何編寫從MSMQ接收,基本上我怎麼創建一個合適的IObservable<T>IRequestListener的實現?

我發現的第一個選擇是根據MSDN documentation給出的例子從零開始創建一個IObservable<T>,但這看起來像是很多管道工作要做的事情。

另一種選擇是使用Reactive Extensions,因爲它似乎被設計成使創建observable更容易。我發現使用的Rx與MSMQ最接近的是這些網頁:

但我不知道我怎麼可以將這些例子我IRequestListener接口。

任何其他的想法也歡迎,甚至如果他們適合我的基本設計的變化。

回答

5

我最初使用FromAsyncPattern,但後來最終爲它寫了一個類,因爲它更好地處理了超時和中毒消息。一旦開始,隊列無論如何都是熱點觀察對象。您也可以使用Observable.Defer使其更接近Rx而不是啓動/停止。

以下是QueueObservable的基本實現。您可以通過致電ListenReceive開始。

Subject<T> Subject = new Subject<T>(); 

protected void ListenReceive() 
{ 
    Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive); 
} 

protected void OnReceive(IAsyncResult ar) 
{ 
    Message message = null; 

    try 
    { 
     message = Queue.EndReceive(ar); 
    } 
    catch (TimeoutException ex) 
    { 
     //retry? 
    } 

    if (message != null) 
     Subject.OnNext((T) message.Body); 

    Thread.Yield(); 

    if (!IsDisposed) 
     ListenReceive(); 
}  

public IObservable<T> AsObservable() 
{ 
     return Subject; 
} 
+0

我在做沿着相同的路線,因爲這一些嘗試看到這個答案之前,使用'主題'內部並幫助跟蹤訂閱和我的實現是從您的建議不是一個百萬英里的路程,謝謝。 – 2012-02-10 11:07:12

+0

@PeterMonks沒問題。 – Asti 2012-02-10 13:12:17

相關問題