我是很新,RxJava和有圖案等 我使用下面的代碼創建一個可觀察到的一些問題:RxJava錯誤處理熱點觀察到
public Observable<Volume> getVolumeObservable(Epic epic) {
return Observable.create(event -> {
try {
listeners.add(streamingAPI.subscribeForChartCandles(epic.getName(), MINUTE, new HandyTableListenerAdapter() {
@Override
public void onUpdate(int i, String s, UpdateInfo updateInfo) {
if (updateInfo.getNewValue(CONS_END).equals(ONE)) {
event.onNext(new Volume(Integer.parseInt(updateInfo.getNewValue(LAST_TRADED_VOLUME))));
}
}
}));
} catch (Exception e) {
LOG.error("Error from volume observable", e);
}
});
}
一切工作正常,但我有一些關於錯誤處理的問題。 如果我理解正確,這將被視爲「熱點觀察」,即無論訂閱與否,事件都會發生(onUpdate是由我無法控制的遠程服務器使用的回調)。
我選擇不要在這裏調用onError,因爲我不希望observable在單個異常情況下停止發射事件。有沒有更好的模式可供使用? .retry()出現在腦海中,但我不確定這是否適合熱門的可觀察性?
另外,在創建訂閱時,但在第一個onNext被調用之前,observable如何表示?它只是一個Observable.empty()
你認爲錯誤來自哪裏?從'listeners.add()'或者'onUpdate()'?你能給出一個錯誤情況的例子,你想要通知訂閱者。 –
我猜你有點誤解熱/冷Observable。這並不熱,每個用戶都有自己的監聽器來發送事件。即使你沒有註銷你的聽衆處置。由於Observable.create機制,observable在處置後不會發出事件。 –
可能是listeners.add()和onUpdate()。不幸的是,我使用的API是非常不明確的。 – Daniel