問題是您正在遞歸合併您的Observable
。每次你做
cmdStream = cmdStream.merge(socketStream);
你正在創建一個新的MergeObservable/MergeObserver
對。
看一下source,你可以看到你基本上對每個訂閱所做的事情是按順序訂閱你以前的每一個流,所以不應該很難看到在250個連接處你的調用棧可能至少有1000個電話深度。
解決此問題的更好方法是轉換使用flatMap
運算符並將您的連接視爲創建Observable
的Observables
。
//Turn the connections themselves into an Observable
var connections = Rx.Observable.fromEvent(server, 'connection',
socket => new JsonSocket(socket));
connections
//flatten the messages into their own Observable
.flatMap(socket => {
return Rx.Observable.fromEvent(socket.__socket, 'message')
//Handle the socket closing as well
.takeUntil(Rx.Observable.fromEvent(socket.__socket, 'close'));
}, (socket, msg) => {
//Transform each message to include the socket as well.
return { socket : socket.__socket, data : msg};
})
.subscribe(processData, handleError);
上面我沒有測試過,但應該修復你的SO錯誤。
我也許會質疑這個的總體設計。通過將所有Observable
合併在一起,你究竟獲得了什麼?您仍然通過將套接字對象與消息一起傳遞來區分它們,因此看起來這些可能都是不同的流。
哦,不僅是這個解決了我的問題,它也將連接變成流(我想這樣做,但不明白flatMap),並在10行代碼(從我的版本100+是我爲了要點而清理的)。謝謝! ++啤酒 –
哦,關於設計,正如你可以告訴我只是進入Rx的東西,所以這部分是「一切都是流」的思維練習,也意圖以後能夠緩衝和設置一些如果需要的話反壓。如果所有傳入的消息都在一個流中,緩衝將更直接。 –
非常歡迎。這不是一條硬性和快速的規則,對於你的情況來說,合併更有意義。無論哪種方式歡迎到溪流之美! – paulpdaniels