2016-05-30 72 views
0

我對RxJS有點新,它踢我的屁股,所以我希望有人能幫助!爲什麼我的RxJS Observable馬上完成?

我在我的快遞服務器上使用RxJS(5)來處理行爲,我必須保存一堆Document對象,然後將其中的每個對象發送給他們的收件人。在我documents/create端點的代碼看起來是這樣的:

// Each element in this stream is an array of `Document` model objects: [<Document>, <Document>, <Document>] 
    const saveDocs$ = Observable.fromPromise(Document.handleCreateBatch(docs, companyId, userId)); 

    const saveThenEmailDocs$ = saveDocs$ 
     .switchMap((docs) => sendInitialEmails$$(docs, user)) 
     .do(x => { 
     // Here x is the `Document` model object 
     debugger; 
     }); 

    // First saves all the docs, and then begins to email them all. 
    // The reason we want to save them all first is because, if an email fails, 
    // we can still ensure that the document is saved 
    saveThenEmailDocs$ 
     .subscribe(
     (doc) => { 
      // This never hits 
     }, 
     (err) => {}, 
     () => { 
      // This hits immediately.. Why though? 
     } 
    ); 

sendInitialEmails$$函數返回可觀察到的,看起來像這樣:

sendInitialEmails$$ (docs, fromUser) { 
    return Rx.Observable.create((observer) => { 

     // Emails each document to their recepients 
     docs.forEach((doc) => { 
     mailer.send({...}, (err) => { 
      if (err) { 
      observer.error(err); 
      } else { 
      observer.next(doc); 
      } 
     }); 
     }); 

     // When all the docs have finished sending, complete the 
     // stream 
     observer.complete(); 
    }); 
    }); 

的問題是,當我訂閱saveThenEmailDocs$,我next處理程序是從不稱爲,並直接到complete。我不知道爲什麼...反過來,如果我從sendInitialEmails$$中刪除observer.complete()呼叫,則next處理程序每​​次都會被調用,而訂閱中的complete處理程序從不會被調用。

爲什麼不是預期的行爲nextnextcomplete發生,而不是它的一個或另一個...我錯過了什麼?

+0

嗨約翰尼,你有沒有能解決這個問題?我知道我遲到了,但最終你的解決方案呢? –

回答

0

我只能假設mailer.send是一個異步調用。 您的observer.complete()在所有異步調用已啓動時,但在任何調用完成之前調用。

在這種情況下,我會從docs數組中創建一個可觀察值的流,而不是像這樣包裝它。

或者,如果你想手工包裝成一個可觀察的,我建議你看看到庫異步和使用

async.each(docs, function(doc, callback) {...}, function finalized(err){...}) 
相關問題