2017-09-05 63 views
0

我是新來的RX和卡住了下一個:RXJava2「無止境」可觀察

我使用SQLBrite當查詢方法被調用它給我回一個「無止境」可觀察監聽到數據庫變化。當一些項目被髮射 - 我需要在一個循環中做一些異步調用,並將我收到的項目給回去。示例代碼:

public Observable<List<Entity>> execute() { 
    return someManager.getAccount() 
      .take(1) // I do not need RX to trigger updates when account changed, so use take(1) 
      .flatMap(accountOptional -> { 
       if (accountOptional.isPresent()) { 
        SomeAccount account = accountOptional.get(); 
        return someManager.getEntityDataSource() 
          .query(new EntitySpecification(account.getId())); 
       } 
       return Observable.just(new ArrayList<Entity>()); // No account - no entities 
      }) 
      .flatMapIterable(entities -> entities) // Here I have a list of entities i I need to make async calls to fill entities with some iiner data (some other entities) 
      .flatMap(this::loadInnerData) 
      .toList() 
      .toObservable(); 
} 


private Observable<Entity> loadInnerData(Entity entity) { 
    return do some work with entity; 
    } 

麻煩的是當我使用toList() - 等待,直到可觀察到結束它的工作 - 但它不會被執行,因爲這是觀察到的聽着分貝。我怎樣才能實現傾聽的能力,不要停止聽RX的「可觀的」Observable和循環異步調用(因爲它是用loadInnerData()完成的)?

回答

0

也許與

return Observable.create(new Action1<Emitter<String>>() { 
     @Override 
     public void call(final Emitter<String> stringEmitter) { 
      // here u can use onNext/onError 
          stringEmitter.onNext(s); 

          stringEmitter.onError(e); 
         } 
        }); 

      stringEmitter.setCancellation(new Cancellable() { 
       @Override 
       public void cancel() throws Exception { 
        subscription.unsubscribe(); 
       } 
      }); 
     } 
    }, Emitter.BackpressureMode.BUFFER); 
嘗試