2017-10-04 82 views
0

我有以下代碼:等待所有觀察到的與rxjs

this.hubService.sendScopedCommand(Constants.hangarCommands.getHangarsOfPlayer).then((result: ICommand) => { 

    let hangars: IHangar[] = result.arguments[0]; 

    for (let hangar of hangars) { 
     this.pieceService.getGroupedPieces(hangar.pieces).subscribe(group => hangar.groupedPieces = group); 
    } 

    this.hangars$.next(hangars); 
}, (ex: any) => this.hangars$.error(ex)); 

所以基本上,sendScopeCommand通過的WebSocket發送的東西,當WebSocket的接收到響應執行then功能。在這一點上,我得到了一個對象的數組,我把它放在hangars

在這些對象中,我有一個玩家擁有的所有棋子的數組。可以有多個相同的片類型,所以我做了一個函數來將它們分組:getGroupedPieces。它的代碼如下:

public getGroupedPieces(pieces: IPiece[]): Observable<IGroupedPiece[]> { 
    return Observable 
     .from(pieces) 
     .groupBy(p => p.pieceTypeId) 
     .flatMap(p => p.toArray()) 
     .map(p => { return <IGroupedPiece>{ amount: p.length, piece: p[0] }; }) 
     .toArray(); 
} 

此代碼的工作原理,但我敢肯定,這是不正確的。事實上,我認爲hangars甚至在for循環中的可觀測值完成之前在可觀測值上發射。

我想在這裏等待所有這些observable完成,然後在Observable上發射hangars

+0

爲什麼不發射訂閱被調用時的事件? –

回答

0

我想你可以嘗試使用RxJS的forkJoin操作:

this.hubService.sendScopedCommand(Constants.hangarCommands.getHangarsOfPlayer).then((result: ICommand) => { 

    let hangars: IHangar[] = result.arguments[0]; 
    let hangars$$: Observable<IHangar>[] = hangars.map(hangar => { 
     return this.pieceService.getGroupedPieces(hangar.pieces) 
    }) 

    Observable 
     .forkJoin(...hangars$$) 
     .subscribe(groups => { 
     groups.forEach((group, i) => hangars[i].groupedPieces = group) 
     this.hangars$.next(hangars); 
     }) 

}, (ex: any) => this.hangars$.error(ex)); 
+0

太棒了,謝謝 – ssougnez

2

就我個人而言,我嘗試儘可能少地調用訂閱,特別是避免可能的訂閱嵌套。

我把一個快速的角度組件放在一起給我如何處理它的樣本。

對不起,如果它的原油或拼寫錯誤(斷鎖骨只用一隻手來使用)。

import { Component, OnInit } from '@angular/core'; 
import { Observable, Subject } from 'rxjs'; 

interface hangar { 
    pieces: number[]; 
    groupedPieces: number[]; 
} 

@Component({ 
    selector: 'app-root', 
    templateUrl: './app.component.html', 
    styleUrls: ['./app.component.css'] 
}) 
export class AppComponent { 
    title = 'app works!'; 
    //output mock 
    hangars$: Subject<hangar[]> = new Subject<hangar[]>(); 

    ngOnInit() { 
     //proof of functionality 
     this.hangars$.subscribe(h => console.log(h)); 
    } 

    //method to mock the then call from example 
    start() { 

     //mock some data 
     let hangars: hangar[] = [{ pieces: [1, 2, 3], groupedPieces: null }, { pieces: [1, 2, 3], groupedPieces: null }, { pieces: [1, 2, 3], groupedPieces: null }]; 

     //subject to handle observable clean up 
     let subManagement$: Subject<any> = new Subject<any>(); 
     let obsArr: Observable<number[]>[] = []; 

     //here were going to build an array the observables but not subscribe to them yet 
     hangars.forEach(hangar => 
      obsArr.push(
       this.getGroupedPieces(hangar.pieces) 
        .takeUntil(subManagement$) 
        .do(group => hangar.groupedPieces = group) 
      ) 
     ); 

     //real magic, this waits for all of the observables responses before emitting its value 
     Observable.combineLatest(obsArr).subscribe(
      () => this.hangars$.next(hangars), 
      null, 
      () => subManagement$.next()//cleanup 
     ); 
    } 

    //mock out your service 
    private getGroupedPieces(pcs: number[]): Observable<number[]> { 

     return Observable.of([1, 2, 3, 4]).delay(1000); 
    } 
} 
+0

如果您將Observable更改爲承諾,則可以使用'Promise.all'來等待所有承諾響應並繼續。 – Dekonunes