2016-08-21 49 views
0

還有就是我如何保存我的數據與RxJava:SQLite的交易和RxJava

override fun put(note: Note): Observable<Note> { 
    validateNote(note) 
    return Observable.just(note) 
      .doOnNext { dbKeeper.startTransaction() } 
      .doOnNext { storeHashtags(note) } 
      .doOnNext { storeImages(note) } 
      .flatMap { notesDataStore.put(notesMapper.transform(note)) } 
      .map { notesMapper.transform(it) } 
      .doOnNext { dbKeeper.setTransactionSuccessful() } 
      .doOnUnsubscribe { dbKeeper.endTransaction() } 
} 

然後我用這個方法是這樣的:

 notesManager.put(note) 
      .switchMap { notesManager.getHashtags() } 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe {view.setHashtags(it) } 

而且doOnUnsubscribe從來沒有被稱爲getHashtags()試圖以供選擇分貝由startTransaction()鎖定。僵局,呃。 好的。我們將doOnUnsubscribe(...)替換爲doOnTerminate(...)

override fun put(note: Note): Observable<Note> { 
    validateNote(note) 
    return Observable.just(note) 
      .doOnNext { dbKeeper.startTransaction() } 
      .doOnNext { storeHashtags(note) } 
      .doOnNext { storeImages(note) } 
      .map { notesMapper.transform(note) } 
      .flatMap { notesDataStore.put(it) } 
      .map { notesMapper.transform(it) } 
      .doOnNext { dbKeeper.setTransactionSuccessful() } 
      .doOnTerminate { dbKeeper.endTransaction() } 
} 

但現在交易不會關閉,如果Observablesubscriber.unsubscribe()中斷。

你能推薦什麼來解決我的情況?

附加信息: 我使用一個writableDb實例來寫入/讀取數據。

回答

1

我會說這不適合Rx;我的建議是做使用通常的(在Java原諒)嘗試,最後的交易:

Observable<Note> put(Note note) { 
    return just(note) 
     .doOnNext(n -> { 
      validateNote(n); 
      dbKeeper.startTransaction(); 
      try { 
       storeHashtags(n); 
       storeImages(n); 
       notesDataStore.put(notesMapper.transform(n)); 
       dbKeeper.setTransactionSuccessful(); 
      } finally { 
       dbKeeper.endTransaction(); 
      } 
     }); 
} 

編輯:也許這是更爲有用:

fun <T> withTransaction(sourceObservable: Observable<T>): Observable<T> { 
    val latch = AtomicInteger(1) 
    val maybeEndTransaction = Action0 { 
     if (latch.decrementAndGet() == 0) { 
      endTransaction() 
     } 
    } 
    return Observable.empty<T>() 
      .doOnCompleted { startTransaction() } 
      .mergeWith(sourceObservable) 
      .doOnNext { setTransactionSuccessful() } 
      .doOnTerminate(maybeEndTransaction) 
      .doOnUnsubscribe(maybeEndTransaction) 
} 

使用方法如下:

override fun put(note: Note): Observable<Note> { 
    return Observable.just(note) 
     .doOnNext { validateNote(note) } 
     .doOnNext { storeHashtags(note) } 
     .doOnNext { storeImages(note) } 
     .flatMap { notesDataStore.put(notesMapper.transform(note)) } 
     .map { notesMapper.transform(it) } 
     .compose { withTransaction } 
} 

它將確保交易結束一次;只要注意不要在原始可觀察鏈中切換線程(除非您的事務處理器能夠處理該線程,或者您修改了調度器以將新線程與現有事務相關聯)。

+0

是的,我已經理解Rx不適合這種情況。但是改變項目的架構爲時已晚。感謝您的解決方案。也許我會使用'BlockingObservable'作爲'notesDataStore.put()'和類似的方法。 – Alexandr

+1

我已添加編輯。 –

+0

看起來很有趣,我會嘗試一下。謝謝! – Alexandr