2015-11-09 35 views
1

我有CompositeSubscription,還有我添加訂閱與ReplaySubject如何使用ReplaySubject重新運行CompositeSubscription?

CompositeSubscription compositeSubscription = new CompositeSubscription(); 
    ReplaySubject subject = ReplaySubject.create(); 

compositeSubscription.add(
       manager.getAllContacts() 
       .toList() 
       .doOnNext(new Action1<List<Person>>() { 
        @Override 
        public void call(List<Person> persons) { 
         allPersons = persons; 
         Log.e(TAG, "BookContacts: " + "allPersons = " + allPersons.size()); 
         setupViewPager(); 
        } 
       }) 
       .subscribe(subject)); 

然後我添加第二認購本ReplaySubject

compositeSubscription.add(Observable.combineLatest(subject, 
                  (PublishSubject<List<CustomUser>>) execute(
                    manager.getDigitsContacts()), 
                  new Func2<List<Person>, List<CustomUser>, Object>() { 
                   @Override 
                   public Object call(List<Person> persons, List<CustomUser> customUsers) { 
                    //... my code with persons and customUsers... 
                    return null; 
                   } 
                  }) 
              .subscribe()); 

代碼工作,即完成ReplaySubject hasCompleted = TRUE後。

,但是當我嘗試添加第三預訂,它不叫「呼叫()」方法

compositeSubscription.add(Observable.combineLatest(subject, 
                  (PublishSubject<List<CustomUser>>) execute(
                    manager.getFacebookContacts()), //<-----manager.getFacebookContacts() is run, but doesn't call call() method 
                  new Func2<List<Person>, List<CustomUser>, Object>() { 
                   @Override 
                   public Object call(List<Person> persons, List<CustomUser> customUsers) { 
                    //...this method is not called after "manager.getFacebookContacts()" 
                    return null; 
                   } 
                  }) 
              .subscribeOn(Schedulers.newThread()) 
              .subscribe()); 

如何解決呢?......因爲如果我同時添加訂閱它工作正常。

+0

在第一代碼片段,它看起來像,而不是'做的副作用doOnNext'你需要的所有聯繫人的'Observable'。然後您可以訂閱它並相應地更新視圖尋呼機。可能你想'BehaviorSubject'而不是'ReplySubject'。我很難理解你想達到什麼,請描述一下。 – pawelzieba

回答

3

請問您可以添加錯誤回調到.subscribe()?我的猜測是,第三次,ReplaySubject溢出combineLatest的緩衝區。除了創建主題,您應該使用.replay().autoConnect(0)

CompositeSubscription compositeSubscription = new CompositeSubscription();

Observable<List<Person>> persons = manager.getAllContacts() 
    .toList() 
    .doOnNext(new Action1<List<Person>>() { 
     @Override 
     public void call(List<Person> persons) { 
      allPersons = persons; 
      Log.e(TAG, "BookContacts: " + "allPersons = " + allPersons.size()); 
      setupViewPager(); 
     } 
    }).replay().autoConnect(0, s -> compositeSubscription.add(s)); 

然後使用persons代替subject

相關問題