2016-11-08 71 views
0

我希望創建一個返回observable<Location>它,我可以發送一個新的位置和用戶獲取最後一個加入任何後續值LocationHandler類。RX的Java 2,可觀察到接受新值添加

我寫這個類,它的工作原理,但我不知道這是否是這樣做,因爲我已經添加了一個回調,我聞到不好的正確方法。

感謝您的任何幫助。

public class LocationHandler { 
    private MessageHandler<Location> onNewItem; 
    private Observable<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     locationObservable = getHookedObservable() 
       .mergeWith(locationInitBuilder.build()) 
       .replay(1).autoConnect(); 
    } 


    private Observable<Location> getHookedObservable() { 
     return Observable.create(new ObservableOnSubscribe<Location>() { 
      @Override 
      public void subscribe(ObservableEmitter<Location> e) throws Exception { 
       onNewItem = location -> e.onNext(location); 
      } 
     }); 
    } 

    public Observable<Location> getLocation(){ 
     return locationObservable; 
    } 

    public void setLocation(Location address){ // <---------- add new values 
     if (onNewItem != null){ 
      onNewItem.handleMessage(address); 
     } else { 
      throw new IllegalStateException("Cannot add an item to a never subscribed stream"); 
     } 
    } 
} 

下面我用一個ReplaySubject修改了它@Blackbelt建議。

public class LocationHandler { 
    private ReplaySubject<Location> inputStream = ReplaySubject.create(1); 
    private Observable<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     locationObservable = locationInitBuilder.build() 
       .mergeWith(inputStream) 
       .replay(1).autoConnect(); 
    } 

    public Observable<Location> getLocation(){ 
     return locationObservable; 
    } 

    public void setLocation(Location address){ 
     inputStream.onNext(address); 
    } 
} 

回答

2

你可以使用的,而不是SubjectMessageHandler。主體可以同時作爲可觀察用戶和訂戶。您可以在您的LocationHandler中使用返回Subject#asObservable的方法,您將訂閱該方法。在內部,當setLocation,你將不得不調用Subject#onNext提供的位置。有不同類型的主題可用。請參閱文檔以選擇更適合您需求的文檔。例如。

public class LocationHandler { 
    BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create(); 

    public Observable<GeevLocation> getLocation() { 
     return mLocationSubject.asObservable(); 
    } 

    public void setLocation(GeevLocation address){ 
     mLocationSubject.onNext(address); 
    } 
} 

從外面呼叫getLocation和訂閱返回Observable。當一個setLocation被稱爲你會得到的對象onNext

1

作爲黑帶已經告訴你,你會使用一個主題。特別是我會使用BehaviorSubject。主題默認爲熱門,但他們可以通過訂閱重播活動。如果您訂閱,BehaviorSubject會爲您提供最後發佈的值或初始值。每個用戶都會得到這些值。流將永遠不會完成,因爲它很熱。請記住處理錯誤,因爲第二個onError將被吞噬。

例碼

class Location { 

} 

class LocationInitializationBuilder { 
    static Location build() { 
     return new Location(); 
    } 
} 

class LocationHandler { 
    private Subject<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     Location initialValue = LocationInitializationBuilder.build(); 

     locationObservable = BehaviorSubject.<Location>createDefault(initialValue).toSerialized(); 
    } 

    public Observable<Location> getLocation() { 
     return locationObservable.hide(); 
    } 

    public void setLocation(Location address) { // <---------- add new values 
     locationObservable.onNext(address); 
    } 
} 

public class LocationTest { 
    @Test 
    public void name() throws Exception { 
     LocationHandler locationHandler = new LocationHandler(new LocationInitializationBuilder()); 

     TestObserver<Location> test = locationHandler.getLocation().test(); 

     locationHandler.setLocation(new Location()); 

     test.assertValueCount(2); 
    } 
} 
+0

其實我不能,因爲它是一個流通過我得到LocationInitializationBuilder.build使用行爲。行爲需要一個我無法在創作時提供的明確價值。 –

+0

是的,我看到問題出在哪裏。您的解決方案看起來合法 –

+1

'BehaviourSubject'有一個靜態方法'create',它創建一個空的'BehaviourSubject' – Blackbelt