2015-09-17 42 views
3

我使用了Android RX java和我的事件總線類妥善處理的onError我EventBus是如下如何使用RX的Java

public class EventBus { 
private final Subject<Event, Event> subject = new SerializedSubject<>(PublishSubject.<Event>create()); 
private Observable<Map<Type, Event>> stickyObservable; 

public EventBus() { 
    createStickyObservable(); 
} 

private void createStickyObservable() { 
    final List<Observable<Event>> observables = new ArrayList<>(); 

    final Observable<Map<Type, Event>> so = subject 
      .filter(event -> event.sticky) 
      .groupBy(event -> event.type) 
      .switchMap(groupedObservable -> { 
       BehaviorSubject<Event> bs = BehaviorSubject.create(); 
       groupedObservable.subscribe(bs); 
       observables.add(bs); 
       return Observable.combineLatest(observables, args -> { 
        Map<Type, Event> map = new HashMap<>(); 
        for (Object arg : args) { 
         Event event = (Event) arg; 
         map.put(event.type, event); 
        } 

        return map; 
       }); 
      }); 

    final BehaviorSubject<Map<Type, Event>> bs = BehaviorSubject.create(); 
    so.subscribe(bs); 
    stickyObservable = bs; 
} 

public Observable<Event> filter(final String pathExpression) { 
    final Pattern pattern = Pattern.compile(pathExpression); 

    return subject.filter(event -> { 
     if (event.path == null) { 
      return pathExpression == null; 
     } 
     return pattern.matcher(event.path).matches(); 
    }); 
} 

public Observable<Map<Type, Event>> getStickyObservable() { 
    return stickyObservable; 
} 


public void event(Event event) { 
    subject.onNext(event); 
} 

}

我得到了很多錯誤日誌與rx.exceptions.OnErrorNotImplementedException:

我該如何解決這個問題?請建議我解決此問題

回答

3

當您訂閱getStickyObservable()時,您需要實施onError方法(不要僅使用例如.subscribe(action)過載)。

+0

謝謝戴夫:)它爲我工作 – Renjith