2015-05-02 45 views
1

我編碼ObservableList根據PublishSubject。每當客戶端將一個元素添加到此列表中時,觀察者將通過主題上的onNext方法得到通知。單個Observable是否會按順序通知所有觀察者?

我已經編碼了兩個樣本觀察者並將它們訂閱到ObservableList。我注意到通知是連續的並且被阻塞。因此,如果我有這兩名觀察員訂閱,則會發生以下情況:

  • 觀察者將按照訂閱的相同順序收到通知。
  • 如果觀察者阻塞,那麼只有當前一個觀察者結束其執行時才通知下一個觀察者。

我真的不在乎ID調用是連續的,但想了解爲什麼通知阻塞以及如何使它不被阻塞。

下面是ObservableList

package co.com.subjects.example; 

import java.util.ArrayList; 
import java.util.List; 

import rx.Observable; 
import rx.functions.Action1; 
import rx.subjects.PublishSubject; 

public class ObservableList<T>{ 

    public String nombre; 
    protected final List<T> list; 
    protected final PublishSubject<T> onAdd; 

    public ObservableList(String nombre) { 
     this.list = new ArrayList<T>(); 
     this.onAdd = PublishSubject.create(); 
     this.nombre = nombre; 
    } 

    public void add(T value) { 
     list.add(value); 
     onAdd.onNext(value); 
    } 

    public Observable<T> getObservable() { 
     return onAdd; 
    } 
} 

回答

2

想知道爲什麼通知阻塞

RxJava的實現假定執行觀察者的onNext方法始終是快速和便宜,所以PublishSubject只是調用它的觀察員之一的全部onNext方法之後,沒有引入任何併發。

,以及我怎樣才能使它不被阻塞

訂閱你的觀察家之前,你可以插入.observeOn(Schedulers.computation())(或不同的調度,根據您的需求),從而使onNext電話在線程池中執行。

1

代碼,您可以通過在通知其在ExecutorService的觀察員包裝觀察員使其不阻塞。

顯然,將任務添加到另一個池是相當昂貴的,所以我只會在知道需要一段時間時纔會這樣做。另請注意,除非您非常小心,否則以這種方式發佈的事件可能會失靈。一個簡單的解決方案是爲每個監聽器配置一個線程執行程序。