我希望創建一個返回observable<Location>
它,我可以發送一個新的位置和用戶獲取最後一個加入任何後續值LocationHandler類。RX的Java 2,可觀察到接受新值添加
我寫這個類,它的工作原理,但我不知道這是否是這樣做,因爲我已經添加了一個回調,我聞到不好的正確方法。
感謝您的任何幫助。
public class LocationHandler {
private MessageHandler<Location> onNewItem;
private Observable<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
locationObservable = getHookedObservable()
.mergeWith(locationInitBuilder.build())
.replay(1).autoConnect();
}
private Observable<Location> getHookedObservable() {
return Observable.create(new ObservableOnSubscribe<Location>() {
@Override
public void subscribe(ObservableEmitter<Location> e) throws Exception {
onNewItem = location -> e.onNext(location);
}
});
}
public Observable<Location> getLocation(){
return locationObservable;
}
public void setLocation(Location address){ // <---------- add new values
if (onNewItem != null){
onNewItem.handleMessage(address);
} else {
throw new IllegalStateException("Cannot add an item to a never subscribed stream");
}
}
}
下面我用一個ReplaySubject修改了它@Blackbelt建議。
public class LocationHandler {
private ReplaySubject<Location> inputStream = ReplaySubject.create(1);
private Observable<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
locationObservable = locationInitBuilder.build()
.mergeWith(inputStream)
.replay(1).autoConnect();
}
public Observable<Location> getLocation(){
return locationObservable;
}
public void setLocation(Location address){
inputStream.onNext(address);
}
}
其實我不能,因爲它是一個流通過我得到LocationInitializationBuilder.build使用行爲。行爲需要一個我無法在創作時提供的明確價值。 –
是的,我看到問題出在哪裏。您的解決方案看起來合法 –
'BehaviourSubject'有一個靜態方法'create',它創建一個空的'BehaviourSubject' – Blackbelt