2017-09-20 36 views
0

上下文:我有一個應用程序的日誌記錄庫,我想使用Rx Subject將日誌寫入數據庫。 問題:在建立與數據庫的連接之前會生成日誌,並且連接可能隨時變得不可用,但是我有和可觀察到的情況是要麼包含具有用於使用數據庫的方法的對象,要麼包含未定義的方法。我想要做的是在數據庫持久性未定義時將日誌保留在流中,並在可用時恢復。RxJS在外部條件下從流中提取數據

我認爲代碼應該是這個樣子:

logsSubject 
      .takeWhen(/* test for database persistence */) // made up name because I don't know a Rx method that does what I want 
      .subscribe(/* write data to database */); 

我不但是知道這實際上是可能的,我使用RxJS了很短的時間

回答

0

你可以得到做到這一點的其他方式:

Observable.combineLatest(dbConnection, logsSubject, (db, logs) => { 
    // whatever: db.save(...) 
    }) 
    .do(/* or since it's a side effect it should probably go here instead */) 

這樣combineLatest將等待兩個源可觀察EMIT和將舉行的dbConnection即使emitte只有一個項目。

+0

這是第一次,但'dbConnection'發出多次,因爲連接可以在引導後重置,並且我想只有在'!! db === true'時才從logsSubject提取數據。 –

+0

然後你可以用'dbConnection.filter(db => !! db)'把它鏈接到'combineLatest' – martin

1

以下代碼將模擬您的場景。數據庫將每10秒連接一次,並在連接後每隔5秒斷開連接。在實際使用情況下,連接和斷開應該共享Observable或Subject。我不是使用takeUntil操作符,而是在這裏使用合併,因爲takeUntil只會完成可觀察的序列,您將無法恢復它。 (repeatWhen運營商應該工作,但它似乎Rxjs 5人失蹤)

//emit value every 1s 
const logWrite = Rx.Observable.interval(1000) 

const dbConnected = new Rx.Subject() 
// make it fire every 10s 
Rx.Observable.interval(10000).do(()=>dbConnected.next('db 
connected')).subscribe() 

const dbDisconnected = Rx.Observable.timer(3000) 
.flatMap(()=>Rx.Observable.throw('db disconnected')) 


const writeWhenConnected = logWrite.merge(dbDisconnected) 
.retryWhen(function(errors) { 
    return errors.switchMap(()=>dbConnected); 
}); 

writeWhenConnected.subscribe(val => console.log(val),console.warn); 
0

似乎什麼,我需要從RxJS4被pausableBuffered但因爲它是在RxJS5被刪除我結束了以下解決方案:

let dbAvailability = dbSubject 
     .switchMap((db) => db ? logSubject : Rx.Observable.never()); 

//get data only when dbAvailability emits, and that happens only when I have a connection 
let bufferedLogs = logSubject.buffer(dbAvailability); 

bufferedLogs 
    .concatAll() 
    .subscribe(/* save to database */);