2017-04-11 113 views
1

我是很新,RxJava和有圖案等 我使用下面的代碼創建一個可觀察到的一些問題:RxJava錯誤處理熱點觀察到

public Observable<Volume> getVolumeObservable(Epic epic) { 
     return Observable.create(event -> { 
      try { 
       listeners.add(streamingAPI.subscribeForChartCandles(epic.getName(), MINUTE, new HandyTableListenerAdapter() { 
        @Override 
        public void onUpdate(int i, String s, UpdateInfo updateInfo) { 
         if (updateInfo.getNewValue(CONS_END).equals(ONE)) { 
          event.onNext(new Volume(Integer.parseInt(updateInfo.getNewValue(LAST_TRADED_VOLUME)))); 
         } 
        } 
       })); 
      } catch (Exception e) { 
       LOG.error("Error from volume observable", e); 
      } 
     }); 
    } 

一切工作正常,但我有一些關於錯誤處理的問題。 如果我理解正確,這將被視爲「熱點觀察」,即無論訂閱與否,事件都會發生(onUpdate是由我無法控制的遠程服務器使用的回調)。

我選擇不要在這裏調用onError,因爲我不希望observable在單個異常情況下停止發射事件。有沒有更好的模式可供使用? .retry()出現在腦海中,但我不確定這是否適合熱門的可觀察性?

另外,在創建訂閱時,但在第一個onNext被調用之前,observable如何表示?它只是一個Observable.empty()

+0

你認爲錯誤來自哪裏?從'listeners.add()'或者'onUpdate()'?你能給出一個錯誤情況的例子,你想要通知訂閱者。 –

+1

我猜你有點誤解熱/冷Observable。這並不熱,每個用戶都有自己的監聽器來發送事件。即使你沒有註銷你的聽衆處置。由於Observable.create機制,observable在處置後不會發出事件。 –

+0

可能是listeners.add()和onUpdate()。不幸的是,我使用的API是非常不明確的。 – Daniel

回答

2

1)你的可觀察性不是。區分因素是多個訂戶是否共享相同的訂閱。 Observable.create()爲每個用戶調用訂閱功能,即它是

雖然很容易使它。只需添加share()運營商。它將訂閱第一位訂閱者並取消訂閱最後一位訂閱者。不要忘記實現退訂功能,像這樣的東西:

event.setCancellable(() -> listeners.remove(...)); 

2)錯誤可能是可恢復無法恢復

如果你認爲一個錯誤是可以自我修復的(不需要你的行爲),你不應該打電話onError,因爲這會殺死你的可觀察的(不會發生進一步的事件)。您可能會通過發出特殊的Volume消息並附上錯誤詳細信息來通知您的訂戶。

如果錯誤是致命的,例如你沒有添加監聽器,所以可能沒有更多的消息,你不應該默默地忽略這個。發射onError因爲你的可觀察性無論如何都不起作用。

如果錯誤需要您採取措施,通常是重試或超時重試,您可以添加retryXxx()運算符中的一個。在create()之後執行此操作,但在之前執行share()

3)Observable是具有subscribe()方法的對象。它是如何表示取決於你創建它的方法。例如,請參閱源代碼create()