3

我需要將處理序列(如在此問題How to organize sequence of data processors with .net RX中)拆分爲Azure環境中的多個計算單元。
這個想法是將Observable序列序列化爲Azure隊列(或服務總線)並將其反序列化。
如果生產者或消費者失敗,另一方應該能夠繼續生產/消費。如何將觀測數據序列化到雲端並返回

任何人都可以建議一個優雅的方式來做到這一點和使用什麼(Azure隊列或服務總線)?
有沒有人使用TCP Observable provider - http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider這樣的問題對一方的失敗是否安全?

+0

你能重命名這個問題嗎?看起來你並沒有問如何序列化一個可觀察的序列(這使我想你想序列化查詢 - > IQbservable),而是觀察序列的值。太pe了?在附註中,我認爲從隊列讀取/寫入更適合IEnumerable。 –

回答

2

假設你有一個messaage隊列具有下列API

class MQ { 

    public MQ(); 

    // send a single message from your message queue 
    public void send(string keyPath, string msg); 

    // Receive a single message from your message queue 
    public async Task<string> receive(keyPath); 

} 

爲了使這RX兼容

class MQRX: IObserver<string> { 
    MQ _mq; 
    string _keyPath 

    MQRX(string keyPath){ 
     _mq = mq; 
     _keyPath = keyPath; 
    } 

    IObservable<string> Observe(){ 
     return Observable.Defer(()=> mq.receive(keyPath).ToObservable()).Repeat(); 
    } 

    void OnNext(string msg){ 
     _mq.send(msg); 
    } 

    void OnError(Exception e){ 
     // The message queue might not 
     // support serializing exceptions 
     // or it might or you might build 
     // a protocol for it. 
    } 
} 

要在容錯方式來使用它。注意:如果拋出上游傳遞通過的OnError

new MQRX("users/1/article/2"). 
    Retry(). 
    Subscribe((msg)=>Console.Writeln(msg)); 

在寫作方面的異常 例如,你可以發送郵件每兩秒鐘,然後重試 訂閱到發電機,如果有一個錯誤的重試將重新訂閱。請注意,Observable.Interval中不太可能存在 錯誤,該錯誤每隔一段時間間隔都會生成一條消息,但 想象的是從文件或其他消息隊列中讀取數據。

var mq = new MQRX("users/1/article/2"); 

Observable.Interval(TimeSpan.FromSeconds(2)). 
    Select((x)=>x.ToString()). 

注意你應該使用的IObservable抓住擴展方法,而不是一味地 重試,你可能會一遍又一遍的得到同樣的錯誤。 重試()。 訂閱(mq);

0

我在30行VB代碼中寫了自己的UDP到RX包裝器,它處理超時。 TCP包裝將會類似我猜。

Imports System.Reactive 
Imports System.Reactive.Linq 
Imports System.Reactive.Threading.Tasks 
Imports System.Threading 
Imports System.Net 
Imports System.Net.Sockets 

Public Module UDP 
    ''' <summary> 
    ''' Generates an IObservable as a UDP stream on the IP endpoint with an optional 
    ''' timeout value between results. 
    ''' </summary> 
    ''' <param name="timeout"></param> 
    ''' <returns></returns> 
    ''' <remarks></remarks> 
    Public Function StreamObserver(
       localPort As Integer, 
       Optional timeout As Nullable(Of TimeSpan) = Nothing 
       ) As IObservable(Of UdpReceiveResult) 

     Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)(
      Function() New UdpClient(localPort), 
      Function(udpClient) 
       Dim o = Observable.Defer(Function() udpClient. 
                   ReceiveAsync(). 
                   ToObservable()) 
       If Not timeout Is Nothing Then 
        Return Observable.Timeout(o.Repeat(), timeout.Value) 
       Else 
        Return o.Repeat() 
       End If 
      End Function 
     ) 
    End Function 
End Module 

如果需要,您可以在寫入端執行相同的操作。然後,您只需使用常規的RX技術將您的數據序列化到UDP幀中即可。

new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)). 
     Select(function(udpResult) MyDeserializeFunction(udpResult)). 
     Subscribe(sub (result) DoSomething(result), 
        sub(error) DoSomethingWithError) 
+0

那麼「如果生產者或消費者失敗了,對方應該能夠繼續生產/消費」部分呢? –

+0

您想查看IObservable的Catch擴展方法,該方法允許您恢復失敗的Observable。但是你不得不問自己是中間的MessageBus一個持久隊列嗎?例如,如果它是UDP,並且生產者丟失了連接,則恢復它,如果生產者同時傳送它們,它將丟失數據包。如果您使用持久性消息隊列,則根據您的設置,您可以從消費者/生產者中的錯誤中恢復,而不會丟失消息。 – bradgonesurfing

+0

持久性消息隊列的例子就是我正在尋找的。儘管Udp和Tcp示例很好啓動。 –

相關問題