2017-05-29 55 views
1

我想跟蹤訂閱者開始消費事件和完成時的情況。 是否有任何通用的方式,將適用於所有Observable/Subscribers?我可以跟蹤RxJava訂戶中事件的消耗情況嗎?

+0

有一個'onSubscribe(一次性)'方法將被調用一次Observable開始發射事件。而當onComplete()'/'onError被調用意味着它終止。 –

+0

謝謝,但我在尋找像RxJavaHooks這樣的通用解決方案。所以我不需要訪問每個observable。在當前的jvm實例中,它應該像任何可見的間諜一樣工作。 – smalafeev

+0

我明白了。你可以在RxJavaPlugin中使用'setOnObservableSubscribe'。 –

回答

6

是1.x還是2.x?對於2.x而言,它會變得相當複雜,因爲所有內部協議都必須考慮到不會意外地降低流量。

否則,它可以像寫那纔是真正Observer和運營商之間勻一個Observer簡單:

import io.reactivex.Observer; 

RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> { 
    if (!observable.getClass().getName().toLowerCase().contains("map")) { 
     return observer; 
    } 

    System.out.println("Started"); 

    class SignalTracker implements Observer<Object>, Disposable { 
     Disposable upstream; 
     @Override public void onSubscribe(Disposable d) { 
      upstream = d; 
      // write the code here that has to react to establishing the subscription 
      observer.onSubscribe(this); 
     } 
     @Override public void onNext(Object o) { 
      // handle onNext before or aftern notifying the downstream 
      observer.onNext(o); 
     } 
     @Override public void onError(Throwable t) { 
      // handle onError 
      observer.onError(t); 
     } 
     @Override public void onComplete() { 
      // handle onComplete 
      System.out.println("Completed"); 
      observer.onComplete(); 
     } 
     @Override public void dispose() { 
      // handle dispose 
      upstream.dispose(); 
     } 
     @Override public boolean isDisposed() { 
      return upstream.isDisposed(); 
     } 
    } 
    return new SignalTracker(); 
    }); 

    Observable<Integer> observable = Observable.range(1, 5) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(Schedulers.computation()) 
     .map(integer -> { 
     try { 
      TimeUnit.SECONDS.sleep(1); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     return integer * 3; 
     }); 

    observable.subscribe(System.out::println); 

    Thread.sleep(6000L); 

打印:

Started 
3 
6 
9 
12 
15 
Completed 

編輯:的RxJava 1版需要更多lambdas但可行:

RxJavaHooks.setOnObservableStart((observable, onSubscribe) -> { 
    if (!onSubscribe.getClass().getName().toLowerCase().contains("map")) { 
     return onSubscribe; 
    } 

    System.out.println("Started"); 

    return (Observable.OnSubscribe<Object>)observer -> { 
     class SignalTracker extends Subscriber<Object> { 
      @Override public void onNext(Object o) { 
       // handle onNext before or aftern notifying the downstream 
       observer.onNext(o); 
      } 
      @Override public void onError(Throwable t) { 
       // handle onError 
       observer.onError(t); 
      } 
      @Override public void onCompleted() { 
       // handle onComplete 
       System.out.println("Completed"); 
       observer.onCompleted(); 
      } 
      @Override public void setProducer(Producer p) { 
       observer.setProducer(p); 
      } 
     } 
     SignalTracker t = new SignalTracker() 
     observer.add(t); 
     onSubscribe.call(t); 
    }; 
    }); 

    Observable<Integer> observable = Observable.range(1, 5) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(Schedulers.computation()) 
     .map(integer -> { 
     try { 
      TimeUnit.SECONDS.sleep(1); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     return integer * 3; 
     }); 

    observable.subscribe(System.out::println); 

    Thread.sleep(6000L); 
+0

我在說明中添加了示例。該解決方案不適用於它。 – smalafeev

+0

你是什麼意思「不起作用?」。 – akarnokd

+0

在用戶完成所有事件處理之前打印「已完成」。 – smalafeev

相關問題