2015-09-27 86 views
1

我試圖將Node的套接字轉換爲使用RxJS的流。目標是讓每個套接字創建它自己的流,並將所有流合併爲一個。當新的套接字連接時,將使用socketStream = Rx.Observable.fromEvent(socket, 'message')創建一個流。RxJS:將Node.js套接字轉換爲Observable並將它們合併爲一個流

然後流合併到的東西主流像

mainStream = mainStream.merge(socketStream)

這似乎做工精細,問題是,經過200-250客戶端連接,服務器拋出RangeError: Maximum call stack size exceeded

我有樣的服務器和客戶端代碼,演示了在這裏要點此行爲: Sample Server and Client

我懷疑,作爲客戶端連接/斷開,主流不正確清理。

回答

3

問題是您正在遞歸合併您的Observable。每次你做

cmdStream = cmdStream.merge(socketStream); 

你正在創建一個新的MergeObservable/MergeObserver對。

看一下source,你可以看到你基本上對每個訂閱所做的事情是按順序訂閱你以前的每一個流,所以不應該很難看到在250個連接處你的調用棧可能至少有1000個電話深度。

解決此問題的更好方法是轉換使用flatMap運算符並將您的連接視爲創建ObservableObservables

//Turn the connections themselves into an Observable 
var connections = Rx.Observable.fromEvent(server, 'connection', 
        socket => new JsonSocket(socket)); 

connections 
    //flatten the messages into their own Observable 
    .flatMap(socket => { 
    return Rx.Observable.fromEvent(socket.__socket, 'message') 
      //Handle the socket closing as well 
      .takeUntil(Rx.Observable.fromEvent(socket.__socket, 'close')); 
    }, (socket, msg) => { 
    //Transform each message to include the socket as well. 
    return { socket : socket.__socket, data : msg}; 
    }) 
    .subscribe(processData, handleError); 

上面我沒有測試過,但應該修復你的SO錯誤。

我也許會質疑這個的總體設計。通過將所有Observable合併在一起,你究竟獲得了什麼?您仍然通過將套接字對象與消息一起傳遞來區分它們,因此看起來這些可能都是不同的流。

+0

哦,不僅是這個解決了我的問題,它也將連接變成流(我想這樣做,但不明白flatMap),並在10行代碼(從我的版本100+是我爲了要點而清理的)。謝謝! ++啤酒 –

+0

哦,關於設計,正如你可以告訴我只是進入Rx的東西,所以這部分是「一切都是流」的思維練習,也意圖以後能夠緩衝和設置一些如果需要的話反壓。如果所有傳入的消息都在一個流中,緩衝將更直接。 –

+0

非常歡迎。這不是一條硬性和快速的規則,對於你的情況來說,合併更有意義。無論哪種方式歡迎到溪流之美! – paulpdaniels

相關問題