2017-04-27 88 views
0

我有一個問題,它是簡單的商業邏輯流程:RxJava訂閱副作用

檢查在多個部門,部門與員工關係的員工是否在緩存中,首先檢查關係是否存在於緩存中,如果檢查員工是否屬於 ,如果不在緩存中,則從數據庫中獲取,並檢查員工關係,然後將部門信息保存到緩存。

這是代碼:

public Observable isEmployeeInDepartment(List<Long> departmentIds, long employeeId){ 

    //this observable will resolve twice, and cause unnecessary cache access 
    Observable departmentInfoExsitInCache= checkDepartmentInfoFromCache(...).share(); 

    Observable departInfoNotInCache = departmentInfoExsitInCache.filter(...); 

    //this observable will resolve twice, and cause unnecessary database access 
    Observable departmentInfoFromDb=departInfoNotInCache.flatMap(departmentIds->checkFromDb()).share(); 

    Observable<Long> saveResult=departmentInfoFromDb.flatMap(departmentInfo->saveToCache()); 

    Observable<Long> departInfoInCache = departmentInfoExsitInCache.filter(...); 

    return departInfoInCache.check(userId).merge(departmentInfoFromDb.check(userId)).doOnCompleted(saveResult.subscribe()); 
} 

問題是,departmentInfoExsitInCache和saveResult將由客戶一次認購方法兩次解決。

我發現一旦刪除保存訂閱代碼.doOnCompleted(saveResult.subscribe()),它將變爲正常並且只能解析一次。這段代碼有什麼問題嗎?

回答

0

您在這兒濫用分享。
問題是share()在這種情況下不會幫助你。 share()實際上是publish().autoConnect(),意思是保持對流的單個訂閱,因此再次調用訂閱將不會再次調用訂閱邏輯,而只會將您連接到現有流。
但是,在所有訂閱者退訂共享流之後,Observable將取消訂閱,這意味着當您再次撥打subscribe()時,您將調用訂閱邏輯並再次調用DB/Cache。

因此,您要做的是在取消訂閱後再次訂閱共享操作員。 (在doOnCompleted()中),這將導致departmentInfoFromDbdepartmentInfoExsitInCache再次預訂並轉至DB/Cache。

考慮使用cache()/reply()運算符在訂閱之間持久從DB/Cache獲取的值。

+0

其實,我不知道有什麼方法使'''saveResult'''得到解決時,整個方法的返回的可觀測得到下標,除了把它放在「主」返回觀察到的'''doOnCompleted() '''方法,使其下標以及 –

+0

如果你想'saveResult'到合併'Obesrvable'後,在最後一行訂閱,您可以用'CONCAT()'操作。但無論如何它不會解決這個問題 – yosriz