我想跟蹤訂閱者開始消費事件和完成時的情況。 是否有任何通用的方式,將適用於所有Observable/Subscribers?我可以跟蹤RxJava訂戶中事件的消耗情況嗎?
1
A
回答
6
是1.x還是2.x?對於2.x而言,它會變得相當複雜,因爲所有內部協議都必須考慮到不會意外地降低流量。
否則,它可以像寫那纔是真正Observer
和運營商之間勻一個Observer
簡單:
import io.reactivex.Observer;
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
if (!observable.getClass().getName().toLowerCase().contains("map")) {
return observer;
}
System.out.println("Started");
class SignalTracker implements Observer<Object>, Disposable {
Disposable upstream;
@Override public void onSubscribe(Disposable d) {
upstream = d;
// write the code here that has to react to establishing the subscription
observer.onSubscribe(this);
}
@Override public void onNext(Object o) {
// handle onNext before or aftern notifying the downstream
observer.onNext(o);
}
@Override public void onError(Throwable t) {
// handle onError
observer.onError(t);
}
@Override public void onComplete() {
// handle onComplete
System.out.println("Completed");
observer.onComplete();
}
@Override public void dispose() {
// handle dispose
upstream.dispose();
}
@Override public boolean isDisposed() {
return upstream.isDisposed();
}
}
return new SignalTracker();
});
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 3;
});
observable.subscribe(System.out::println);
Thread.sleep(6000L);
打印:
Started
3
6
9
12
15
Completed
編輯:的RxJava 1版需要更多lambdas但可行:
RxJavaHooks.setOnObservableStart((observable, onSubscribe) -> {
if (!onSubscribe.getClass().getName().toLowerCase().contains("map")) {
return onSubscribe;
}
System.out.println("Started");
return (Observable.OnSubscribe<Object>)observer -> {
class SignalTracker extends Subscriber<Object> {
@Override public void onNext(Object o) {
// handle onNext before or aftern notifying the downstream
observer.onNext(o);
}
@Override public void onError(Throwable t) {
// handle onError
observer.onError(t);
}
@Override public void onCompleted() {
// handle onComplete
System.out.println("Completed");
observer.onCompleted();
}
@Override public void setProducer(Producer p) {
observer.setProducer(p);
}
}
SignalTracker t = new SignalTracker()
observer.add(t);
onSubscribe.call(t);
};
});
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 3;
});
observable.subscribe(System.out::println);
Thread.sleep(6000L);
相關問題
- 1. 我可以跟蹤HTML中錨點中的多個事件嗎?
- 2. DDHotKey - 在不取消事件的情況下跟蹤
- 3. HTTPHandler跟蹤下載可以取消嗎?
- 4. 我們應該跟蹤代碼以外的其他事情嗎?
- 5. 跟蹤C++中的內存使用情況並評估內存消耗
- 6. 我可以一次跟蹤多個Google Analytics事件嗎?
- 7. 我可以使用Google Analytics來跟蹤Greasemonkey腳本的使用情況嗎?
- 8. 消費「Windows事件跟蹤」事件
- 9. 可以跟蹤github拉嗎?
- 10. 跟蹤android app的電池消耗量?
- 11. 振幅可以跟蹤鍵盤擴展的事件嗎?
- 12. php跟蹤隨機情況
- 13. RSS使用情況跟蹤
- 14. 我可以取消DataGridView.CellMouseDoubleClick事件嗎?
- 15. 跟蹤iOS UIWebView內存消耗
- 16. 跟蹤線程內存和CPU消耗
- 17. BTrace - 它可以跟蹤按鈕點擊事件嗎?
- 18. Visual Studio可以跟蹤所有事件嗎?
- 19. 我可以在Google Analytics中跟蹤我的Firebase功能嗎?
- 20. 在不消耗任何觸摸事件的情況下收聽滑動事件
- 21. 跟蹤Google Analytics中活動用戶的應用事件嗎?
- 22. 我可以跟蹤來自'index.php'的請求頁面/文件嗎?
- 23. 事件跟蹤
- 24. WPF中的事件消耗
- 25. 如何跟蹤用戶/服務帳戶的BigQuery使用情況
- 26. 用於IE7的hashchange polyfill,可以跟蹤每個訂單的多個事件
- 27. 用戶可以一起購買消耗品和非消耗品嗎?
- 28. Qt中的跟蹤事件
- 29. 我可以在沒有用戶的情況下使用oauth2嗎?
- 30. 我可以在不附加VS調試器的情況下從CLR異常中獲取堆棧跟蹤嗎?
有一個'onSubscribe(一次性)'方法將被調用一次Observable開始發射事件。而當onComplete()'/'onError被調用意味着它終止。 –
謝謝,但我在尋找像RxJavaHooks這樣的通用解決方案。所以我不需要訪問每個observable。在當前的jvm實例中,它應該像任何可見的間諜一樣工作。 – smalafeev
我明白了。你可以在RxJavaPlugin中使用'setOnObservableSubscribe'。 –