假設你有一個messaage隊列具有下列API
class MQ {
public MQ();
// send a single message from your message queue
public void send(string keyPath, string msg);
// Receive a single message from your message queue
public async Task<string> receive(keyPath);
}
爲了使這RX兼容
class MQRX: IObserver<string> {
MQ _mq;
string _keyPath
MQRX(string keyPath){
_mq = mq;
_keyPath = keyPath;
}
IObservable<string> Observe(){
return Observable.Defer(()=> mq.receive(keyPath).ToObservable()).Repeat();
}
void OnNext(string msg){
_mq.send(msg);
}
void OnError(Exception e){
// The message queue might not
// support serializing exceptions
// or it might or you might build
// a protocol for it.
}
}
要在容錯方式來使用它。注意:如果拋出上游傳遞通過的OnError
new MQRX("users/1/article/2").
Retry().
Subscribe((msg)=>Console.Writeln(msg));
在寫作方面的異常 例如,你可以發送郵件每兩秒鐘,然後重試 訂閱到發電機,如果有一個錯誤的重試將重新訂閱。請注意,Observable.Interval中不太可能存在 錯誤,該錯誤每隔一段時間間隔都會生成一條消息,但 想象的是從文件或其他消息隊列中讀取數據。
var mq = new MQRX("users/1/article/2");
Observable.Interval(TimeSpan.FromSeconds(2)).
Select((x)=>x.ToString()).
注意你應該使用的IObservable抓住擴展方法,而不是一味地 重試,你可能會一遍又一遍的得到同樣的錯誤。 重試()。 訂閱(mq);
你能重命名這個問題嗎?看起來你並沒有問如何序列化一個可觀察的序列(這使我想你想序列化查詢 - > IQbservable),而是觀察序列的值。太pe了?在附註中,我認爲從隊列讀取/寫入更適合IEnumerable。 –