2016-05-23 109 views
1

我嘗試使用Rx Realm複製數據庫觸發器函數。一旦我釋放RealmList,我會用它做一些事情並保存。可悲的是,這導致Realm的更改監聽器再次執行,一遍又一遍地發送列表。更改監聽器中的領域更改

假人例如:

realm.where(MyRealmObject.class) 
      .equalTo("state", "new") 
      .findAll() 
      .asObservable() 
      .flatMap(new Func1<RealmResults<MyRealmObject>, Observable<MyRealmObject>>() { 
       @Override 
       public Observable<MyRealmObject> call(RealmResults<MyRealmObject> list) { 
        return Observable.from(list); 
       } 
      }) 
      .subscribe(new Action1<MyRealmObject>() { 
       @Override 
       public void call(final MyRealmObject object) { 
        realm.executeTransaction(new Realm.Transaction() { 
         @Override 
         public void execute(Realm realm) { 
          // do any realm change 
         } 
        }); 
       } 
      }); 

一旦我犯訂戶事務,新RealmList從可觀察到的emited。我知道爲什麼發生這種情況,我只是沒有看到如何解決這個問題。

這將我們帶到我的問題。有沒有辦法如何複製觸發功能的領域,我會做任何領域的變化?

回答

0

可以使用幫助程序流構建解決方法,以確定是否應該消耗來自數據庫的下一項。每個數據存儲到數據庫應該伴隨寫入助手流。下面收益率運行試驗:

upstream: IgnoreAction{action='start', ignoreNext=false} 
result: 1 
result: 2 
result: 3 
upstream: IgnoreAction{action='1', ignoreNext=true} 
upstream: IgnoreAction{action='2', ignoreNext=true} 
upstream: IgnoreAction{action='3', ignoreNext=true}  

因此,第一數據(「開始」)被消耗,而在onNext寫入觸發被忽略。

@Test 
public void rxIgnore() throws Exception { 

MockDb mockDb = new MockDb(); 

BehaviorSubject<Boolean> ignoreNextStream = BehaviorSubject.create(false); 
Observable<String> dataStream = mockDb.dataSource(); 

dataStream.zipWith(ignoreNextStream, Data::new) 
     .doOnNext(action -> System.out.println("upstream: " + action)) 
     .filter(Data::isTakeNext) 
     .flatMap(__ -> Observable.just(1, 2, 3)) 
     .subscribe(new Observer<Integer>() { 
      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(Throwable e) { 

      } 

      @Override 
      public void onNext(Integer val) { 
       System.out.println("result: " + val); 
       ignoreNextStream.onNext(true); 
       mockDb.data(String.valueOf(val)); 
      } 
     }); 

mockDb.data("start"); 

Observable.empty().delay(1, TimeUnit.MINUTES).toBlocking().subscribe(); 
} 

private static class Data { 
private final String action; 
private final boolean ignoreNext; 

public Data(String action, boolean ignoreNext) { 
    this.action = action; 
    this.ignoreNext = ignoreNext; 
} 

public boolean isTakeNext() { 
    return !ignoreNext; 
} 

@Override 
public String toString() { 
    return "IgnoreAction{" + 
      "action='" + action + '\'' + 
      ", ignoreNext=" + ignoreNext + 
      '}'; 
} 
} 

private static class MockDb { 

private final Subject<String, String> subj = PublishSubject.<String>create() 
              .toSerialized(); 

public void data(String action) { 
    subj.onNext(action); 
} 

Observable<String> dataSource() { 
    return subj; 
} 
}