2017-05-26 96 views
1

在過去,我們使用了幾個angular2 websocket,但是我們對它們並不滿意,使用它們有幾個問題。所以我們決定嘗試用我們自己的RxJs來做我們的財富。Angular Websocket RxJS/WebSocket主題

這是我們第一次嘗試:

@Injectable() 
export class WebSocketService{ 

    public createWebsocket(url: string): Subject<MessageEvent> { 
     let socket = new WebSocket(url); 

     let observable = Observable.create(
            (observer: Observer<MessageEvent>) => { 
             socket.onmessage = observer.next.bind(observer); 
             socket.onerror = observer.error.bind(observer); 
             socket.onclose = observer.complete.bind(observer); 

             return socket.close.bind(socket); 
            }); 

     let observer = { 
      next: (data: Object) => { 
       if (socket.readyState === WebSocket.OPEN) { 
        socket.send(JSON.stringify(data)); 
       } 
      } 
     }; 

     return Subject.create(observer, observable); 
    } 
} 

插座被打開,工作了一段時間好。幾秒鐘後,瀏覽器關閉套接字,並在服務器端收到關閉事件。

這是封閉的原因,我們得到服務器站點: [1006] WebSocket的閱讀EOF

有誰可以幫忙嗎?還是有人知道如何使用WebSocketSubject?

+0

https://gearheart.io/blog/auto-websocket-reconnection-with-rxjs/ –

+0

@JuliaPassynkova thanx爲您的鏈接,我已經找到了這個解決方案,這很好。這個解決方案的問題是,總是存在關閉Websocket的問題,這個解決方案在關閉連接時沒有問題。但它就像是第一次連接,在服務器上,第一次連接會花費很多昂貴的東西。 因此,如果連接永遠不會從瀏覽器端關閉,那將會更好。 所以問題是,爲什麼瀏覽器正在關閉websocket? –

+0

由於您似乎已經完成了您自己的websocket實現,請您分享一下您自己的解決方案嗎? – BlackHoleGalaxy

回答

1

我不`噸知道,如果它仍然是相關的,但我沒有使用從角的WebSocket連接與https://github.com/ohjames/rxjs-websockets

一些我在那裏我打電話的ServerSocketService在組件內部所做的修改的類似的東西(專供連接到基於websocket的端點)即重試機制,我使用ReplaySubject而不是示例中給出的QueuingSubject。

@Injectable() 
export class ServerSocket { 
    // private inputStream: QueueingSubject<string>; 
    private inputStream: ReplaySubject<string>; 
    public messages: Observable<string>; 
    private subscription: Subscription; 
    private websocket: WebSocket; 

    public connect() { 
     if (this.messages) { 
      return; 
     } 
     console.log('inside connect'); 
     // this.inputStream = new QueueingSubject<string>(); 
     this.inputStream = new ReplaySubject(); 

     // Using share() causes a single websocket to be created when the first 
     // observer subscribes. This socket is shared with subsequent observers 
     // and closed when the observer count falls to zero. 
     this.messages = websocketConnect(
      'ws://localhost:9097/echo', 
      this.inputStream 
     ).messages.share(); 


     this.messages.retryWhen(errors => errors.delay(1000)).subscribe(message => { 
      console.log('error', message); 
     }); 
    } 

    public send(message: string): void { 
     // If the websocket is not connected then the QueueingSubject will ensure 
     // that messages are queued and delivered when the websocket reconnects. 
     // A regular Subject can be used to discard messages sent when the websocket 
     // is disconnected. 

     this.inputStream.next(message); 

    } 
} 

然後在組件的OnInit生命週期內進行連接,訂閱然後發送消息。一旦組件到達其OnDestroy生命週期,就通過取消訂閱來釋放資源。

+0

這看起來不錯,同時我們已經適應了我們自己的websocket實現,但是如果我們有時間的話我們也會看看 –