2017-04-10 220 views
2

我的角度應用程序使用websocket與後端進行通信。RxJs使用WebSocket可觀察

在我的測試案例中,我有2個客戶端組件。 Observable timer按預期打印兩個不同的客戶端ID。

每個ngOnInit()還會打印其客戶端的ID。

現在由於某種原因,websocketService.observeClient()的訂閱被稱爲每個消息2次,但this.client.id總是打印第二個客戶端的值。

我的繼承人客戶端組件

@Component({ 
... 
}) 
export class ClientComponent implements OnInit { 

    @Input() client: Client; 

    constructor(public websocketService: WebsocketService) { 
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id)); 
    } 

    ngOnInit() { 

    console.log(this.client.id); 
    this.websocketService.observeClient().subscribe(data => { 
     console.log('message', this.client.id); 
    }); 

    } 

} 

而且我的WebSocket服務

@Injectable() 
export class WebsocketService { 

    private observable: Observable<MessageEvent>; 
    private observer: Subject<Message>; 

    constructor() { 

    const socket = new WebSocket('ws://localhost:9091'); 

    this.observable = Observable.create(
     (observer: Observer<MessageEvent>) => { 
     socket.onmessage = observer.next.bind(observer); 
     socket.onerror = observer.error.bind(observer); 
     socket.onclose = observer.complete.bind(observer); 
     return socket.close.bind(socket); 
     } 
    ); 

    this.observer = Subject.create({ 
     next: (data: Message) => { 
     if (socket.readyState === WebSocket.OPEN) { 
      socket.send(JSON.stringify(data)); 
     } 
     } 
    }); 

    } 

    observeClient(): Observable<MessageEvent> { 
    return this.observable; 
    } 

} 

編輯

好,據我已經閱讀它的事實,做觀測量是單播對象,我必須使用主題,但我不知道如何創建主題。

+0

你確定這個問題是不是在你的'providers'配置?從你的描述看來,你希望每個客戶端組件都有自己的'WebsocketService'實例。 – martin

+0

不應該有一個注入websocketService連接到後端 – Pascal

回答

1

您需要使用share運營商在訂戶之間共享它。

this.observable = Observable.create(
    (observer: Observer<MessageEvent>) => { 
     socket.onmessage = observer.next.bind(observer); 
     socket.onerror = observer.error.bind(observer); 
     socket.onclose = observer.complete.bind(observer); 
     return socket.close.bind(socket); 
    } 
).share(); 

此外,請確保此服務是單身。

文件:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md

0

由於rxjs 5,你可以使用內置的WebSocket功能,爲您創建的主題。當您在錯誤發生後重新訂閱流時,它也會重新連接。請參考這個答案:

https://stackoverflow.com/a/44067972/552203

TLDR:

let subject = Observable.webSocket('ws://localhost:8081'); 
subject 
    .retry() 
    .subscribe(
     (msg) => console.log('message received: ' + msg), 
     (err) => console.log(err), 
    () => console.log('complete') 
    ); 
subject.next(JSON.stringify({ op: 'hello' }));