2017-08-24 27 views
2

我有一個可觀察的鏈,最初的觀察者來自網絡,並且每當消息準備被讀取時就會被觸發。接下來的處理程序然後讀取消息並將其反序列化。現在我有一個observable分支,一個是消息處理程序,另一個是記錄消息。f#可觀察到的叉子和副作用

的問題是,因爲我觀察到的使用我實際上將嘗試讀取消息的兩倍。

據我所知,使用事件,而不是可觀察就能解決問題,但我會再有一個垃圾收集問題,可能導致無法收集套接字。

一種解決方案我想的是插入某種分離器的其中可觀察結束的一個鏈,並創建一個新的,不這樣的功能已經存在作爲fsharp或其它文庫庫的一部分。

是否有其他解決方案?

編輯:

代碼示例不正常工作

let messagesStream = 
    socket.observable |> 
    Observable.map (fun() -> socket.read()) |> 
    Observable.map (fun m -> deserialize m) 

messagesStream |> Observable.add (fun m -> printf m) 
messagesStream |> Observable.add (fun m -> handle m) 
+3

在一邊:你應該把推進管道* *前的功能,你管到,不*背後*值你管。將它們排列起來使得它更清晰。 – TeaDrivenDev

+2

對ab可觀察管道的每個訂閱都會導致訂閱源可觀察項。您正在爲'socket.observable'創建兩個訂閱。您應該嘗試在'messagesStream' observable的末尾放置一個發佈操作符。 – Enigmativity

+0

謝謝,我發現關於發佈,你知道任何與發佈功能的f#庫嗎?如果您可以添加答案和代碼示例,我會將其標記爲答案。謝謝 – somdoron

回答

0

這聽起來像你可以創建一個可觀察到的,從網絡處理該信息的讀取和反序列化。假設這是使用標準的Rx操作符完成的,那麼應該返回一個觀察值,它推動新的反序列化的網絡消息。

你可以有2個訂戶可觀察的,一個反應與任何業務邏輯,你想和第二用戶登錄消息新消息。

這應該消除多次從網絡上讀取的副作用。推送2份反序列化的消息不應帶來副作用。

+0

我用一個不起作用的例子編輯了代碼。 – somdoron

+0

我不確定我是否理解你的答案。你所建議的不是使用Observable.map,而是訂閱網絡,然後生成新的可觀察數據?你能給出一個代碼示例嗎? – somdoron

+0

看到您發佈的代碼,看起來您嘗試了我在回答中提出的建議。什麼不工作? –

0

添加一些記錄,最簡單的方法是使用Observable.iter如下:

let messagesStream = 
    socket.observable 
    |> Observable.map (fun() -> socket.read()) 
    |> Observable.map (fun m -> deserialize m) 
    |> Observable.iter (printfn "%A") 

messagesStream |> Observable.add (fun m -> handle m)