2015-09-22 36 views
7

假設我有以下RxJava代碼(訪問一個數據塊,但具體使用情況是不相關的):在RxJava中取消訂閱線程安全嗎?

public Observable<List<DbPlaceDto>> getPlaceByStringId(final List<String> stringIds) { 
    return Observable.create(new Observable.OnSubscribe<List<DbPlaceDto>>() { 
     @Override 
     public void call(Subscriber<? super List<DbPlaceDto>> subscriber) { 
      try { 
       Cursor c = getPlacseDb(stringIds); 

       List<DbPlaceDto> dbPlaceDtoList = new ArrayList<>(); 
       while (c.moveToNext()) { 
        dbPlaceDtoList.add(getDbPlaceDto(c)); 
       } 
       c.close(); 

       if (!subscriber.isUnsubscribed()) { 
        subscriber.onNext(dbPlaceDtoList); 
        subscriber.onCompleted(); 
       } 
      } catch (Exception e) { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(e); 
       } 
      } 
     } 
    }); 
} 

鑑於此代碼,我有以下問題:

  1. 如果有人取消訂閱從此方法返回的觀察值(之前的訂閱),是否該操作是線程安全的?無論調度如何,我的'isUnsubscribed()'檢查是否正確?

  2. 是否有一個更簡潔的方式與更少的樣板代碼來檢查未訂閱的狀態比我在這裏使用?我在框架中找不到任何東西。我認爲SafeSubscriber解決了用戶未訂閱時不轉發事件的問題,但顯然它沒有。

回答

8

是,操作線程安全的嗎?

是。您正在接收一個rx.Subscriber(最終)檢查訂閱者訂閱未訂閱時設置爲true的volatile布爾值。

用更少的樣板代碼清潔器的方法來檢查未訂閱狀態

SyncOnSubscribeAsyncOnSubscribe(作爲@Experimental API作爲釋放1.0.15的)用於該用途的情況下被創建。它們可以作爲調用Observable.create的安全替代方案。這是同步案例的一個(人爲的)例子。

public static class FooState { 
    public Integer next() { 
     return 1; 
    } 
    public void shutdown() { 

    } 
    public FooState nextState() { 
     return new FooState(); 
    } 
} 
public static void main(String[] args) { 
    OnSubscribe<Integer> sos = SyncOnSubscribe.createStateful(FooState::new, 
      (state, o) -> { 
       o.onNext(state.next()); 
       return state.nextState(); 
      }, 
      state -> state.shutdown()); 
    Observable<Integer> obs = Observable.create(sos); 
} 

注意,SyncOnSubscribe下一個功能是不允許調用不止一次observer.onNext以上每次迭代也不能同時打電話到該觀測。以下是1.x分支頭上的​​SyncOnSubscribeimplementationtests的幾個鏈接。它的主要用途是簡化編寫可觀測數據,以同步和onNext方式對數據進行迭代或解析,但在支持背壓和檢查是否退訂的框架中這樣做。實質上,您可以創建一個next函數,每次下游運算符需要一個新的數據元素onNexted時,該函數都會被調用。你的下一個函數可以在0或1次的時候調用onNext。

AsyncOnSubscribe設計用於良好地反壓異步操作的可觀察源(如離箱呼叫)的背壓。您的下一個函數的參數包括請求計數,並且您提供的可觀察值應該提供一個可觀察的數據,以滿足所請求數量的數據。這種行爲的一個例子是來自外部數據源的分頁查詢。

此前,將您的OnSubscribe轉換爲Iterable並使用Observable.from(Iterable)是安全做法。此實現獲取迭代器併爲您檢查subscriber.isUnsubscribed()

+0

謝謝,您實際上回答了我的另一個關於創建具有適當背壓支持的「自定義」可觀察的問題!我檢查了SyncSubscriber,它看起來非常好。在很多情況下,我會將操作轉換爲Iterable在語義上有點尷尬,但是知道我們可以通過這種方式輕鬆實現背壓支持,這仍然很好。我發現這個班級仍然被標記爲@Experimental,你認爲它什麼時候可以粗略地考慮到生產準備? –

+0

好聽!下一步是將其發佈到一個版本中(1.0.15版即將推出)。之後,一般情況下,它會進入'@ Beta'狀態或直接進入公共狀態,因爲我們會獲得信心。 – Aaron