2017-10-09 70 views
2

我想答案是forkJoin/Promises.all ...但它多一點,請與我裸...異步操作的可觀測(主題)時,中間有沒有重疊的異步操作是完成

所以......我有一個承諾的來源,可以隨機順序到達,我需要一些方式來說「當所有承諾到期爲止,完成後讓我知道」。

在一個基於Promise的解決方案中,我最初考慮使用Promise.all,但承諾可能仍然「到達」,而其他人尚未完成。有趣的是,對於「迭代Promise.all」有一個整潔的解決方法,在https://stackoverflow.com/a/37819138/239168

我試圖做到這一點的方式。稍後閱讀文檔,我認爲forkJoinPromise.all等效,但是,同樣的問題,我沒有時間可以安全地呼叫forkJoinPromise.all,因爲總是可以再添加一個,而另一個仍處於待定狀態。因爲我現在可能沒有意義,所以我想我會要求一些指導。

設置

(握住你的笑,如果這是愚蠢的,我是新來的Rx ...)

我有一個主題,我想,當知道所有的承諾,在這是完整的......還總是可以在任何時候得到新的補充承諾......

private promiseSource = new Subject<Promise<any>>(); 
promises$ = this.promiseSource.asObservable(); 

每當一個新的承諾「到達」,我只是將它添加到受

this.promiseSource.next(somePromise); 

我想要奇蹟般地發生的事情是 - 只要它擁有完整的承諾,就讓主題「完成」。

例如

promises$.magicFlatMapForkJoinConcatMapTrickery().subscribe({ 
    next: x => ..., 
    error: err => ..., 
    complete:() => { 
    console.log('all promises we got so far are done'); 
    // nice to have, I want this to keep "listening" for new promises 
    promiseSource.youAreNotREALYCompletePleaseReset(); 
    } 
}); 

或者換句話說,我有一個觀察的異步操作的,如果我們看一看的內容,我們可以看到重疊的異步操作,我想知道什麼時候有沒有重疊例如

|<-async action 1->| |<-async action 3->| 
      |<-async action 2->|      |<-async action 4->| 

              /\  /\ 
             find this gap 

如果這些是例如http調用,我基本問 - 告訴我什麼時候沒有打開http調用。

TL;博士

如何落實這一承諾在RxJS世界基於答案...

https://stackoverflow.com/a/37819138/239168

回答

1

我能想到的做着這基於一個相當簡單的方法先前的回答。您可以使用fromPromise將您的Subject<Promise<any>>變成Subject<Observable<any>>,然後您可以使用this答案中描述的active函數將其降至可觀察的活動可觀察值。一旦你有了這些,你可以將你的查詢短語定義爲「當活動流數組變爲空時」,這可以用一個簡單的過濾器來完成,例如,:

active(yourSubjectOfObservables).filter(x => x.length === 0).subscribe(() => { 
    // here we are, all complete 
}); 

這將每次觸發活動數據流轉換的次數爲零,所以如果你只是想在第一時間,將一個.take(1)或。 first之間的過濾器和訂閱。

可能不是最漂亮的解決方案,但它在概念上很簡單。

+0

酷,會試試看......我還在攀登懸崖:) –

2

如果我正確解釋你的問題,你只對表示是否有未決承諾的信號感興趣。

使用mergescan來創建一個觀測值可以很容易地發出未決承諾的計數,因此,您應該可以創建任何您喜歡的信號。

基本上,每次受試者發出承諾時,應該增加未決承諾的次數。每當這些承諾中的一個解決時,計數可以遞減。

const promises = new Rx.Subject(); 
 

 
const pendingCount = Rx.Observable 
 
    .merge(
 
    promises.mapTo(1), 
 
    promises.mergeMap(p => Rx.Observable.from(p).mapTo(-1)) 
 
) 
 
    .scan((acc, value) => acc + value, 0) 
 
    .do(count => console.log(`${count} pending promise(s)`)); 
 

 
const doneSignal = pendingCount 
 
    .filter(count => count === 0) 
 
    .mapTo("done"); 
 

 
doneSignal.subscribe(signal => console.log(signal)); 
 

 
const timeoutPromise = (delay) => new Promise(resolve => setTimeout(resolve, delay)); 
 

 
promises.next(timeoutPromise(200)); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 100); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 300); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>